From 65b03bd1384de2d5a341fd9a9864a15c7d79ad20 Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Sat, 19 Jun 2021 19:03:29 +0800 Subject: [PATCH 01/12] HDDS-4928. Support container move in Replication Manager --- .../scm/container/ReplicationManager.java | 262 +++++++++++++++++- .../scm/container/TestReplicationManager.java | 166 ++++++++++- 2 files changed, 417 insertions(+), 11 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 13f19f1fa9c9..7d336d8b0beb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -28,9 +28,11 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.StringJoiner; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -39,6 +41,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; @@ -48,6 +52,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; import org.apache.hadoop.hdds.scm.ContainerPlacementStatus; import org.apache.hadoop.hdds.scm.PlacementPolicy; @@ -134,6 +139,44 @@ public class ReplicationManager implements MetricsSource, SCMService { */ private final Map> inflightDeletion; + /** + * This is used for tracking container move commands + * which are not yet complete. + */ + private final Map> inflightMove; + + /** + * This is used for indicating the result of move option and + * the corresponding reason. this is useful for tracking + * the result of move option + */ + enum MoveResult { + // both replication and deletion are completed + COMPLETED, + // replication fail because of timeout + REPLICATION_FAIL_TIME_OUT, + // replication fail because node is unhealthy + REPLICATION_FAIL_NODE_UNHEALTHY, + // replication succeed, but deletion fail because of timeout + DELETION_FAIL_TIME_OUT, + // replication succeed, but deletion fail because because + // node is unhealthy + DELETION_FAIL_NODE_UNHEALTHY, + // replication succeed, but if we delete the container from + // the source datanode , the policy(eg, replica num or + // rack location) will not be satisfied, so we should not delete + // the container + DELETE_FAIL_POLICY + } + + /** + * This is used for tracking container move commands + * which are not yet complete. + */ + private final Map> inflightMoveFuture; + /** * ReplicationManager specific configuration. */ @@ -191,6 +234,8 @@ public ReplicationManager(final ConfigurationSource conf, this.running = false; this.inflightReplication = new ConcurrentHashMap<>(); this.inflightDeletion = new ConcurrentHashMap<>(); + this.inflightMove = new ConcurrentHashMap<>(); + this.inflightMoveFuture = new ConcurrentHashMap<>(); this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum(); this.waitTimeInMillis = conf.getTimeDuration( @@ -210,7 +255,6 @@ public ReplicationManager(final ConfigurationSource conf, */ @Override public synchronized void start() { - if (!isRunning()) { DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME, "SCM Replication manager (closed container replication) related " @@ -260,6 +304,9 @@ public synchronized void stop() { LOG.info("Stopping Replication Monitor Thread."); inflightReplication.clear(); inflightDeletion.clear(); + //TODO: replicate inflight move through ratis + inflightMove.clear(); + inflightMoveFuture.clear(); running = false; DefaultMetricsSystem.instance().unregisterSource(METRICS_SOURCE_NAME); notifyAll(); @@ -458,6 +505,37 @@ private void updateInflightAction(final ContainerInfo container, if (health != NodeState.HEALTHY || a.time < deadline || filter.test(a)) { iter.remove(); + + if (inflightMove.containsKey(id)) { + boolean isInflightReplication = + inflightActions.equals(inflightReplication); + //if replication is completed , nothing to do + if (!(isInflightReplication && filter.test(a))) { + if (isInflightReplication) { + if (health != NodeState.HEALTHY) { + inflightMoveFuture.get(id).complete( + MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); + } else { + inflightMoveFuture.get(id).complete( + MoveResult.REPLICATION_FAIL_TIME_OUT); + } + } else { + if (health != NodeState.HEALTHY) { + inflightMoveFuture.get(id).complete( + MoveResult.DELETION_FAIL_NODE_UNHEALTHY); + } else if (a.time < deadline) { + inflightMoveFuture.get(id).complete( + MoveResult.DELETION_FAIL_TIME_OUT); + } else { + inflightMoveFuture.get(id).complete( + MoveResult.COMPLETED); + } + } + inflightMove.remove(id); + inflightMoveFuture.remove(id); + } + } + } } catch (NodeNotFoundException e) { // Should not happen, but if it does, just remove the action as the @@ -471,6 +549,127 @@ private void updateInflightAction(final ContainerInfo container, } } + /** + * add a move action for a given container. + * + * @param cid Container to move + * @param srcDn datanode to move from + * @param targetDn datanode to move to + */ + public Optional> move(ContainerID cid, + DatanodeDetails srcDn, DatanodeDetails targetDn) + throws ContainerNotFoundException, NodeNotFoundException { + LOG.info("receive a move requset about container {} , from {} to {}", + cid, srcDn.getUuid(), targetDn.getUuid()); + Optional> ret = Optional.empty(); + if (!isRunning()) { + LOG.info("Replication Manager in not running. please start it first"); + return ret; + } + + /* + * make sure the flowing conditions are met: + * 1 the given two datanodes are in healthy state + * 2 the given container exists on the given source datanode + * 3 the given container does not exist on the given target datanode + * 4 the given container is in closed state + * 5 the giver container is not taking any inflight action + * 6 the given two datanodes are in IN_SERVICE state + * + * move is a combination of two steps : replication and deletion. + * if the conditions above are all met, then we take a conservative + * strategy here : replication can always be executed, but the execution + * of deletion always depends on placement policy + */ + + NodeStatus currentNodeStat = nodeManager.getNodeStatus(srcDn); + NodeState healthStat = currentNodeStat.getHealth(); + NodeOperationalState operationalState = + currentNodeStat.getOperationalState(); + if (healthStat != NodeState.HEALTHY) { + LOG.info("given source datanode is in {} state, " + + "not in HEALTHY state", healthStat); + return ret; + } + if (operationalState != NodeOperationalState.IN_SERVICE) { + LOG.info("given source datanode is in {} state, " + + "not in IN_SERVICE state", operationalState); + return ret; + } + + currentNodeStat = nodeManager.getNodeStatus(targetDn); + healthStat = currentNodeStat.getHealth(); + operationalState = currentNodeStat.getOperationalState(); + if (healthStat != NodeState.HEALTHY) { + LOG.info("given target datanode is in {} state, " + + "not in HEALTHY state", healthStat); + return ret; + } + if (operationalState != NodeOperationalState.IN_SERVICE) { + LOG.info("given target datanode is in {} state, " + + "not in IN_SERVICE state", operationalState); + return ret; + } + + // we need to synchronize on ContainerInfo, since it is + // shared by ICR/FCR handler and this.processContainer + // TODO: use a Read lock after introducing a RW lock into ContainerInfo + ContainerInfo cif = containerManager.getContainer(cid); + synchronized (cif) { + final Set replicas = containerManager + .getContainerReplicas(cid).stream() + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toSet()); + if (replicas.contains(targetDn)) { + LOG.info("given container exists in the target Datanode"); + return ret; + } + if (!replicas.contains(srcDn)) { + LOG.info("given container does not exist in the source Datanode"); + return ret; + } + + /* + * the reason why the given container should not be taking any inflight + * action is that: if the given container is being replicated or deleted, + * the num of its replica is not deterministic, so move operation issued + * by balancer may cause a nondeterministic result, so we should drop + * this option for this time. + * */ + + if (inflightReplication.containsKey(cid)) { + LOG.info("given container is in inflight replication"); + return ret; + } + if (inflightDeletion.containsKey(cid)) { + LOG.info("given container is in inflight deletion"); + return ret; + } + + /* + * here, no need to see whether cid is in inflightMove, because + * these three map are all synchronized on ContainerInfo, if cid + * is in infligtMove , it must now being replicated or deleted, + * so it must be in inflightReplication or in infligthDeletion. + * thus, if we can not find cid in both of them , this cid must + * not be in inflightMove. + */ + + LifeCycleState currentContainerStat = cif.getState(); + if (currentContainerStat != LifeCycleState.CLOSED) { + LOG.info("given container is in {} state, " + + "not in CLOSED state", currentContainerStat); + return ret; + } + inflightMove.putIfAbsent(cid, new ImmutablePair<>(srcDn, targetDn)); + ret = Optional.of(inflightMoveFuture + .computeIfAbsent(cid, k -> new CompletableFuture<>())); + sendReplicateCommand(cif, targetDn, Collections.singletonList(srcDn)); + } + + return ret; + } + /** * Returns the number replica which are pending creation for the given * container ID. @@ -862,16 +1061,55 @@ private void handleOverReplicatedContainer(final ContainerInfo container, break; } } + + eligibleReplicas.removeAll(unhealthyReplicas); + boolean isInMove = inflightMove.containsKey(id); + boolean isSourceDnInReplicaSet = false; + boolean isTargetDnInReplicaSet = false; + + if (isInMove) { + Pair movePair = + inflightMove.get(id); + final DatanodeDetails sourceDN = movePair.getKey(); + isSourceDnInReplicaSet = eligibleReplicas.stream() + .anyMatch(r -> r.getDatanodeDetails().equals(sourceDN)); + isTargetDnInReplicaSet = eligibleReplicas.stream() + .anyMatch(r -> r.getDatanodeDetails() + .equals(movePair.getValue())); + int sourceDnPos = 0; + for (int i = 0; i < eligibleReplicas.size(); i++) { + if (eligibleReplicas.get(i).getDatanodeDetails() + .equals(sourceDN)) { + sourceDnPos = i; + break; + } + } + if (isTargetDnInReplicaSet) { + // if the container is in inflightMove and target datanode is + // included in the replicas, then swap the source datanode to + // first of the replica list if exists, so the source datanode + // will be first removed if possible. + eligibleReplicas.add(0, eligibleReplicas.remove(sourceDnPos)); + } else { + // a container replica that being moved should not be removed. + // if the container is in inflightMove and target datanode is not + // included in the replicas, then swap the source datanode to the + // last of the replica list, so the source datanode will not + // be removed. + eligibleReplicas.add(eligibleReplicas.remove(sourceDnPos)); + } + } + // After removing all unhealthy replicas, if the container is still over // replicated then we need to check if it is already mis-replicated. // If it is, we do no harm by removing excess replicas. However, if it is // not mis-replicated, then we can only remove replicas if they don't // make the container become mis-replicated. if (excess > 0) { - eligibleReplicas.removeAll(unhealthyReplicas); Set eligibleSet = new HashSet<>(eligibleReplicas); ContainerPlacementStatus ps = getPlacementStatus(eligibleSet, replicationFactor); + for (ContainerReplica r : eligibleReplicas) { if (excess <= 0) { break; @@ -900,6 +1138,18 @@ private void handleOverReplicatedContainer(final ContainerInfo container, "violating the placement policy", container, excess); } } + + if (isInMove && isSourceDnInReplicaSet && isTargetDnInReplicaSet) { + // if source and target datanode are both in the replicaset, + // but we can not delete source datanode for now (e.g., + // there is only 3 replicas or not policy-statisfied , etc.), + // we just complete the future without sending a delete command. + LOG.info("can not remove source replica after successfully " + + "replicated to target datanode"); + inflightMoveFuture.get(id).complete(MoveResult.DELETE_FAIL_POLICY); + inflightMove.remove(id); + inflightMoveFuture.remove(id); + } } } @@ -913,7 +1163,8 @@ private void handleOverReplicatedContainer(final ContainerInfo container, private ContainerPlacementStatus getPlacementStatus( Set replicas, int replicationFactor) { List replicaDns = replicas.stream() - .map(c -> c.getDatanodeDetails()).collect(Collectors.toList()); + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); return containerPlacement.validateContainerPlacement( replicaDns, replicationFactor); } @@ -1153,6 +1404,8 @@ public void getMetrics(MetricsCollector collector, boolean all) { inflightReplication.size()) .addGauge(ReplicationManagerMetrics.INFLIGHT_DELETION, inflightDeletion.size()) + .addGauge(ReplicationManagerMetrics.INFLIGHT_MOVE, + inflightMove.size()) .endRecord(); } @@ -1248,7 +1501,8 @@ public int getMaintenanceReplicaMinimum() { public enum ReplicationManagerMetrics implements MetricsInfo { INFLIGHT_REPLICATION("Tracked inflight container replication requests."), - INFLIGHT_DELETION("Tracked inflight container deletion requests."); + INFLIGHT_DELETION("Tracked inflight container deletion requests."), + INFLIGHT_MOVE("Tracked inflight container move requests."); private final String desc; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java index 7d3d45726dc0..f3c480a7401a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -68,11 +68,14 @@ import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.createDatanodeDetails; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; import static org.apache.hadoop.hdds.scm.TestUtils.getContainer; import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas; import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; @@ -650,7 +653,7 @@ public void testHealthyClosedContainer() throws SCMException, ContainerNotFoundException, InterruptedException { final ContainerInfo container = getContainer(LifeCycleState.CLOSED); final ContainerID id = container.containerID(); - final Set replicas = getReplicas(id, CLOSED, + final Set replicas = getReplicas(id, State.CLOSED, randomDatanodeDetails(), randomDatanodeDetails(), randomDatanodeDetails()); @@ -1096,6 +1099,143 @@ public void testUnderReplicatedNotHealthySource() assertReplicaScheduled(0); } + /** + * before Replication Manager generates a completablefuture for a move option, + * some Prerequisites should be met. + */ + + @Test + public void testMovePrerequisites() + throws SCMException, ContainerNotFoundException, + NodeNotFoundException, InterruptedException { + + //all conditions is met + final ContainerInfo container = createContainer(LifeCycleState.CLOSED); + ContainerID id = container.containerID(); + ContainerReplica dn1 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + ContainerReplica dn2 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY)); + ContainerReplica dn4 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + //replication manager will not generate any inflight action + replicationManager.processContainersNow(); + //move should succeed + Assert.assertTrue(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + + + //the above move is executed successfully, so there may be some item in + //inflightReplication or inflightDeletion. here we stop replication manager + //to clear these states, which may impact the tests below. + //we don't need a running replicationManamger now + resetReplicationManager(); + + //container in not in CLOSED state + container.setState(LifeCycleState.CLOSING); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + container.setState(LifeCycleState.OPEN); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + container.setState(LifeCycleState.QUASI_CLOSED); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + container.setState(LifeCycleState.DELETING); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + container.setState(LifeCycleState.CLOSED); + + //Node is not in healthy state + nodeManager.setNodeStatus(dn1.getDatanodeDetails(), + new NodeStatus(IN_SERVICE, STALE)); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + nodeManager.setNodeStatus(dn1.getDatanodeDetails(), + new NodeStatus(IN_SERVICE, DEAD)); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + nodeManager.setNodeStatus(dn1.getDatanodeDetails(), + new NodeStatus(IN_SERVICE, HEALTHY)); + nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, STALE)); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, DEAD)); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY)); + + //Node is not in IN_SERVICE state + nodeManager.setNodeStatus(dn1.getDatanodeDetails(), + new NodeStatus(DECOMMISSIONING, STALE)); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + nodeManager.setNodeStatus(dn1.getDatanodeDetails(), + new NodeStatus(DECOMMISSIONED, DEAD)); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + nodeManager.setNodeStatus(dn1.getDatanodeDetails(), + new NodeStatus(ENTERING_MAINTENANCE, HEALTHY)); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + nodeManager.setNodeStatus(dn1.getDatanodeDetails(), + new NodeStatus(IN_MAINTENANCE, HEALTHY)); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + nodeManager.setNodeStatus(dn1.getDatanodeDetails(), + new NodeStatus(IN_SERVICE, HEALTHY)); + nodeManager.setNodeStatus(dn3, new NodeStatus(DECOMMISSIONING, STALE)); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + nodeManager.setNodeStatus(dn3, new NodeStatus(DECOMMISSIONED, DEAD)); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + nodeManager.setNodeStatus(dn3, + new NodeStatus(ENTERING_MAINTENANCE, HEALTHY)); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + nodeManager.setNodeStatus(dn3, new NodeStatus(IN_MAINTENANCE, HEALTHY)); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY)); + + //container exists in target datanode + Assert.assertFalse(replicationManager.move(id, dn1.getDatanodeDetails(), + dn2.getDatanodeDetails()).isPresent()); + + //container does not exist in source datanode + Assert.assertFalse(replicationManager.move(id, dn3, + dn2.getDatanodeDetails()).isPresent()); + + //make container over relplicated to test the + // case that thatcontainer is in inflightDeletion + ContainerReplica dn5 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), State.CLOSED); + ContainerReplica dn6 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), State.CLOSED); + replicationManager.processContainersNow(); + //waiting for inflightDeletion generation + Thread.sleep(500); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + + //clear inflight actions + resetReplicationManager(); + + //make the replica num be 2 to test the case + // that container is in inflightReplication + containerStateManager.removeContainerReplica(id, dn6); + containerStateManager.removeContainerReplica(id, dn5); + containerStateManager.removeContainerReplica(id, dn4); + //replication manager should generate inflightReplication + replicationManager.processContainersNow(); + //waiting for inflightReplication generation + Thread.sleep(500); + Assert.assertFalse(replicationManager.move(id, + dn1.getDatanodeDetails(), dn3).isPresent()); + } + private ContainerInfo createContainer(LifeCycleState containerState) throws SCMException { final ContainerInfo container = getContainer(containerState); @@ -1103,21 +1243,32 @@ private ContainerInfo createContainer(LifeCycleState containerState) return container; } - private ContainerReplica addReplica(ContainerInfo container, - NodeStatus nodeStatus, State replicaState) - throws ContainerNotFoundException { + private DatanodeDetails addNode(NodeStatus nodeStatus) { DatanodeDetails dn = randomDatanodeDetails(); dn.setPersistedOpState(nodeStatus.getOperationalState()); dn.setPersistedOpStateExpiryEpochSec( nodeStatus.getOpStateExpiryEpochSeconds()); nodeManager.register(dn, nodeStatus); + return dn; + } + + private void resetReplicationManager() throws InterruptedException { + replicationManager.stop(); + Thread.sleep(500); + replicationManager.start(); + } + + private ContainerReplica addReplica(ContainerInfo container, + NodeStatus nodeStatus, State replicaState) + throws ContainerNotFoundException { + DatanodeDetails dn = addNode(nodeStatus); // Using the same originID for all replica in the container set. If each // replica has a unique originID, it causes problems in ReplicationManager // when processing over-replicated containers. final UUID originNodeId = UUID.nameUUIDFromBytes(Longs.toByteArray(container.getContainerID())); final ContainerReplica replica = getReplicas( - container.containerID(), CLOSED, 1000L, originNodeId, dn); + container.containerID(), replicaState, 1000L, originNodeId, dn); containerStateManager .updateContainerReplica(container.containerID(), replica); return replica; @@ -1215,4 +1366,5 @@ public boolean matches(Object argument) { return function.apply(argument); } } -} \ No newline at end of file +} + From f5631ec2cc09ce9996f82c93cceb94bc59414a8d Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Tue, 29 Jun 2021 17:51:55 +0800 Subject: [PATCH 02/12] update --- .../scm/container/ReplicationManager.java | 318 ++++++++++++------ .../scm/container/TestReplicationManager.java | 128 ------- 2 files changed, 212 insertions(+), 234 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 7d336d8b0beb..2fc2bd6e40f3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -28,7 +28,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.StringJoiner; import java.util.UUID; @@ -154,8 +153,24 @@ public class ReplicationManager implements MetricsSource, SCMService { enum MoveResult { // both replication and deletion are completed COMPLETED, + // RM is not running + RM_NOT_RUNNING, + // source datanode disappear somehow + REPLICATION_FAIL_SOURCE_DISAPPEAR, + // replication fail because the container does not exist in src + REPLICATION_FAIL_NOT_EXIST_IN_SOURCE, + // replication fail because the container exists in target + REPLICATION_FAIL_EXIST_IN_TARGET, + // replication fail because the container is not cloesed + REPLICATION_FAIL_CONTAINER_NOT_CLOSED, + // replication fail because the container is in inflightDeletion + REPLICATION_FAIL_INFLIGHT_DELETION, + // replication fail because the container is in inflightReplication + REPLICATION_FAIL_INFLIGHT_REPLICATION, // replication fail because of timeout REPLICATION_FAIL_TIME_OUT, + // replication fail because of node is not in service + REPLICATION_FAIL_NODE_NOT_IN_SERVICE, // replication fail because node is unhealthy REPLICATION_FAIL_NODE_UNHEALTHY, // replication succeed, but deletion fail because of timeout @@ -167,7 +182,11 @@ enum MoveResult { // the source datanode , the policy(eg, replica num or // rack location) will not be satisfied, so we should not delete // the container - DELETE_FAIL_POLICY + DELETE_FAIL_POLICY, + //unexpected action, remove src at inflightReplication + UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION, + //unexpected action, remove target at inflightDeletion + UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION } /** @@ -502,40 +521,13 @@ private void updateInflightAction(final ContainerInfo container, InflightAction a = iter.next(); NodeState health = nodeManager.getNodeStatus(a.datanode) .getHealth(); - if (health != NodeState.HEALTHY || a.time < deadline - || filter.test(a)) { + boolean isUnhealthy = health != NodeState.HEALTHY; + boolean isCompleted = filter.test(a); + boolean isTimeout = a.time < deadline; + if (isUnhealthy || isTimeout || isCompleted) { iter.remove(); - - if (inflightMove.containsKey(id)) { - boolean isInflightReplication = - inflightActions.equals(inflightReplication); - //if replication is completed , nothing to do - if (!(isInflightReplication && filter.test(a))) { - if (isInflightReplication) { - if (health != NodeState.HEALTHY) { - inflightMoveFuture.get(id).complete( - MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); - } else { - inflightMoveFuture.get(id).complete( - MoveResult.REPLICATION_FAIL_TIME_OUT); - } - } else { - if (health != NodeState.HEALTHY) { - inflightMoveFuture.get(id).complete( - MoveResult.DELETION_FAIL_NODE_UNHEALTHY); - } else if (a.time < deadline) { - inflightMoveFuture.get(id).complete( - MoveResult.DELETION_FAIL_TIME_OUT); - } else { - inflightMoveFuture.get(id).complete( - MoveResult.COMPLETED); - } - } - inflightMove.remove(id); - inflightMoveFuture.remove(id); - } - } - + updateMoveIfNeeded(isUnhealthy, isCompleted, isTimeout, + id, a.datanode, inflightActions); } } catch (NodeNotFoundException e) { // Should not happen, but if it does, just remove the action as the @@ -549,6 +541,93 @@ private void updateInflightAction(final ContainerInfo container, } } + /** + * update inflight move if needed. + * + * @param isUnhealthy is the datanode unhealthy + * @param isCompleted is the action completed + * @param isTimeout is the action timeout + * @param id Container to update + * @param dn datanode which is removed from the inflightActions + * @param inflightActions inflightReplication (or) inflightDeletion + */ + private void updateMoveIfNeeded(final boolean isUnhealthy, + final boolean isCompleted, final boolean isTimeout, + final ContainerID id, final DatanodeDetails dn, + final Map> inflightActions) { + // make sure inflightMove contains the container + if (!inflightMove.containsKey(id)) { + return; + } + + // make sure the datanode , which is removed from inflightActions, + // is source or target datanode. + Pair kv = inflightMove.get(id); + final boolean isSource = kv.getKey().equals(dn); + final boolean isTarget = kv.getValue().equals(dn); + if (!isSource && !isTarget) { + return; + } + final boolean isInflightReplication = + inflightActions.equals(inflightReplication); + + /* + * there are some case: + ********************************************************** + * * InflightReplication * InflightDeletion * + ********************************************************** + *source removed* unexpected * expected * + ********************************************************** + *target removed* expected * unexpected * + ********************************************************** + * unexpected action may happen somehow. to make it deterministic, + * if unexpected action happens, we just fail the completableFuture. + */ + + if (isSource && isInflightReplication) { + inflightMoveFuture.get(id).complete( + MoveResult.UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION); + inflightMove.remove(id); + inflightMoveFuture.remove(id); + return; + } + + if (isTarget && !isInflightReplication) { + inflightMoveFuture.get(id).complete( + MoveResult.UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION); + inflightMove.remove(id); + inflightMoveFuture.remove(id); + return; + } + + //if replication is completed , nothing to do + if (!(isInflightReplication && isCompleted)) { + if (isInflightReplication) { + if (isUnhealthy) { + inflightMoveFuture.get(id).complete( + MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); + } else { + inflightMoveFuture.get(id).complete( + MoveResult.REPLICATION_FAIL_TIME_OUT); + } + } else { + if (isUnhealthy) { + inflightMoveFuture.get(id).complete( + MoveResult.DELETION_FAIL_NODE_UNHEALTHY); + } else if (isTimeout) { + inflightMoveFuture.get(id).complete( + MoveResult.DELETION_FAIL_TIME_OUT); + } else { + inflightMoveFuture.get(id).complete( + MoveResult.COMPLETED); + } + } + inflightMove.remove(id); + inflightMoveFuture.remove(id); + } + } + /** * add a move action for a given container. * @@ -556,14 +635,12 @@ private void updateInflightAction(final ContainerInfo container, * @param srcDn datanode to move from * @param targetDn datanode to move to */ - public Optional> move(ContainerID cid, + public CompletableFuture move(ContainerID cid, DatanodeDetails srcDn, DatanodeDetails targetDn) throws ContainerNotFoundException, NodeNotFoundException { - LOG.info("receive a move requset about container {} , from {} to {}", - cid, srcDn.getUuid(), targetDn.getUuid()); - Optional> ret = Optional.empty(); + CompletableFuture ret = new CompletableFuture<>(); if (!isRunning()) { - LOG.info("Replication Manager in not running. please start it first"); + ret.complete(MoveResult.RM_NOT_RUNNING); return ret; } @@ -587,13 +664,11 @@ public Optional> move(ContainerID cid, NodeOperationalState operationalState = currentNodeStat.getOperationalState(); if (healthStat != NodeState.HEALTHY) { - LOG.info("given source datanode is in {} state, " + - "not in HEALTHY state", healthStat); + ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); return ret; } if (operationalState != NodeOperationalState.IN_SERVICE) { - LOG.info("given source datanode is in {} state, " + - "not in IN_SERVICE state", operationalState); + ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE); return ret; } @@ -601,13 +676,11 @@ public Optional> move(ContainerID cid, healthStat = currentNodeStat.getHealth(); operationalState = currentNodeStat.getOperationalState(); if (healthStat != NodeState.HEALTHY) { - LOG.info("given target datanode is in {} state, " + - "not in HEALTHY state", healthStat); + ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); return ret; } if (operationalState != NodeOperationalState.IN_SERVICE) { - LOG.info("given target datanode is in {} state, " + - "not in IN_SERVICE state", operationalState); + ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE); return ret; } @@ -621,11 +694,11 @@ public Optional> move(ContainerID cid, .map(ContainerReplica::getDatanodeDetails) .collect(Collectors.toSet()); if (replicas.contains(targetDn)) { - LOG.info("given container exists in the target Datanode"); + ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET); return ret; } if (!replicas.contains(srcDn)) { - LOG.info("given container does not exist in the source Datanode"); + ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE); return ret; } @@ -638,11 +711,11 @@ public Optional> move(ContainerID cid, * */ if (inflightReplication.containsKey(cid)) { - LOG.info("given container is in inflight replication"); + ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION); return ret; } if (inflightDeletion.containsKey(cid)) { - LOG.info("given container is in inflight deletion"); + ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION); return ret; } @@ -657,16 +730,15 @@ public Optional> move(ContainerID cid, LifeCycleState currentContainerStat = cif.getState(); if (currentContainerStat != LifeCycleState.CLOSED) { - LOG.info("given container is in {} state, " + - "not in CLOSED state", currentContainerStat); + ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED); return ret; } inflightMove.putIfAbsent(cid, new ImmutablePair<>(srcDn, targetDn)); - ret = Optional.of(inflightMoveFuture - .computeIfAbsent(cid, k -> new CompletableFuture<>())); + inflightMoveFuture.putIfAbsent(cid, ret); sendReplicateCommand(cif, targetDn, Collections.singletonList(srcDn)); } - + LOG.info("receive a move requset about container {} , from {} to {}", + cid, srcDn.getUuid(), targetDn.getUuid()); return ret; } @@ -1085,60 +1157,40 @@ private void handleOverReplicatedContainer(final ContainerInfo container, } } if (isTargetDnInReplicaSet) { - // if the container is in inflightMove and target datanode is - // included in the replicas, then swap the source datanode to - // first of the replica list if exists, so the source datanode - // will be first removed if possible. - eligibleReplicas.add(0, eligibleReplicas.remove(sourceDnPos)); - } else { - // a container replica that being moved should not be removed. - // if the container is in inflightMove and target datanode is not - // included in the replicas, then swap the source datanode to the - // last of the replica list, so the source datanode will not - // be removed. - eligibleReplicas.add(eligibleReplicas.remove(sourceDnPos)); - } - } - - // After removing all unhealthy replicas, if the container is still over - // replicated then we need to check if it is already mis-replicated. - // If it is, we do no harm by removing excess replicas. However, if it is - // not mis-replicated, then we can only remove replicas if they don't - // make the container become mis-replicated. - if (excess > 0) { - Set eligibleSet = new HashSet<>(eligibleReplicas); - ContainerPlacementStatus ps = - getPlacementStatus(eligibleSet, replicationFactor); - - for (ContainerReplica r : eligibleReplicas) { - if (excess <= 0) { - break; + if (isSourceDnInReplicaSet) { + // if the target is present, and source disappears somehow, + // we can consider move is successful. + inflightMoveFuture.get(id).complete(MoveResult.COMPLETED); + inflightMove.remove(id); + inflightMoveFuture.remove(id); + } else { + // if the container is in inflightMove and target datanode is + // included in the replicas, then swap the source datanode to + // first of the replica list if exists, so the source datanode + // will be first removed if possible. + eligibleReplicas.add(0, eligibleReplicas.remove(sourceDnPos)); } - // First remove the replica we are working on from the set, and then - // check if the set is now mis-replicated. - eligibleSet.remove(r); - ContainerPlacementStatus nowPS = - getPlacementStatus(eligibleSet, replicationFactor); - if ((!ps.isPolicySatisfied() - && nowPS.actualPlacementCount() == ps.actualPlacementCount()) - || (ps.isPolicySatisfied() && nowPS.isPolicySatisfied())) { - // Remove the replica if the container was already unsatisfied - // and losing this replica keep actual placement count unchanged. - // OR if losing this replica still keep satisfied - sendDeleteCommand(container, r.getDatanodeDetails(), true); - excess -= 1; - continue; + } else { + if (isSourceDnInReplicaSet) { + // a container replica that being moved should not be removed. + // if the container is in inflightMove and target datanode is not + // included in the replicas, then swap the source datanode to the + // last of the replica list, so the source datanode will not + // be removed. + eligibleReplicas.add(eligibleReplicas.remove(sourceDnPos)); + } else { + // if the target is not present, and source disappears somehow, + // we fail the completeableFuture directly. + inflightMoveFuture.get(id).complete( + MoveResult.REPLICATION_FAIL_SOURCE_DISAPPEAR); + inflightMove.remove(id); + inflightMoveFuture.remove(id); } - // If we decided not to remove this replica, put it back into the set - eligibleSet.add(r); - } - if (excess > 0) { - LOG.info("The container {} is over replicated with {} excess " + - "replica. The excess replicas cannot be removed without " + - "violating the placement policy", container, excess); } } + removeExcessReplicasIfNeeded(excess, container, eligibleReplicas); + if (isInMove && isSourceDnInReplicaSet && isTargetDnInReplicaSet) { // if source and target datanode are both in the replicaset, // but we can not delete source datanode for now (e.g., @@ -1153,6 +1205,60 @@ private void handleOverReplicatedContainer(final ContainerInfo container, } } + + /** + * remove execess replicas if needed, replicationFactor and placement policy + * will be take into consideration. + * + * @param excess the excess number after subtracting replicationFactor + * @param container ContainerInfo + * @param eligibleReplicas An list of replicas, which may have excess replicas + */ + private void removeExcessReplicasIfNeeded(int excess, + final ContainerInfo container, + final List eligibleReplicas) { + // After removing all unhealthy replicas, if the container is still over + // replicated then we need to check if it is already mis-replicated. + // If it is, we do no harm by removing excess replicas. However, if it is + // not mis-replicated, then we can only remove replicas if they don't + // make the container become mis-replicated. + if (excess > 0) { + Set eligibleSet = new HashSet<>(eligibleReplicas); + final int replicationFactor = + container.getReplicationConfig().getRequiredNodes(); + ContainerPlacementStatus ps = + getPlacementStatus(eligibleSet, replicationFactor); + + for (ContainerReplica r : eligibleReplicas) { + if (excess <= 0) { + break; + } + // First remove the replica we are working on from the set, and then + // check if the set is now mis-replicated. + eligibleSet.remove(r); + ContainerPlacementStatus nowPS = + getPlacementStatus(eligibleSet, replicationFactor); + if ((!ps.isPolicySatisfied() + && nowPS.actualPlacementCount() == ps.actualPlacementCount()) + || (ps.isPolicySatisfied() && nowPS.isPolicySatisfied())) { + // Remove the replica if the container was already unsatisfied + // and losing this replica keep actual placement count unchanged. + // OR if losing this replica still keep satisfied + sendDeleteCommand(container, r.getDatanodeDetails(), true); + excess -= 1; + continue; + } + // If we decided not to remove this replica, put it back into the set + eligibleSet.add(r); + } + if (excess > 0) { + LOG.info("The container {} is over replicated with {} excess " + + "replica. The excess replicas cannot be removed without " + + "violating the placement policy", container, excess); + } + } + } + /** * Given a set of ContainerReplica, transform it to a list of DatanodeDetails * and then check if the list meets the container placement policy. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java index f3c480a7401a..cf2593db819e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -68,10 +68,8 @@ import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.createDatanodeDetails; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; import static org.apache.hadoop.hdds.protocol.proto @@ -1108,132 +1106,6 @@ public void testUnderReplicatedNotHealthySource() public void testMovePrerequisites() throws SCMException, ContainerNotFoundException, NodeNotFoundException, InterruptedException { - - //all conditions is met - final ContainerInfo container = createContainer(LifeCycleState.CLOSED); - ContainerID id = container.containerID(); - ContainerReplica dn1 = addReplica(container, - new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); - ContainerReplica dn2 = addReplica(container, - new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); - DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY)); - ContainerReplica dn4 = addReplica(container, - new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); - //replication manager will not generate any inflight action - replicationManager.processContainersNow(); - //move should succeed - Assert.assertTrue(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - - - //the above move is executed successfully, so there may be some item in - //inflightReplication or inflightDeletion. here we stop replication manager - //to clear these states, which may impact the tests below. - //we don't need a running replicationManamger now - resetReplicationManager(); - - //container in not in CLOSED state - container.setState(LifeCycleState.CLOSING); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - container.setState(LifeCycleState.OPEN); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - container.setState(LifeCycleState.QUASI_CLOSED); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - container.setState(LifeCycleState.DELETING); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - container.setState(LifeCycleState.CLOSED); - - //Node is not in healthy state - nodeManager.setNodeStatus(dn1.getDatanodeDetails(), - new NodeStatus(IN_SERVICE, STALE)); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - nodeManager.setNodeStatus(dn1.getDatanodeDetails(), - new NodeStatus(IN_SERVICE, DEAD)); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - nodeManager.setNodeStatus(dn1.getDatanodeDetails(), - new NodeStatus(IN_SERVICE, HEALTHY)); - nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, STALE)); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, DEAD)); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY)); - - //Node is not in IN_SERVICE state - nodeManager.setNodeStatus(dn1.getDatanodeDetails(), - new NodeStatus(DECOMMISSIONING, STALE)); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - nodeManager.setNodeStatus(dn1.getDatanodeDetails(), - new NodeStatus(DECOMMISSIONED, DEAD)); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - nodeManager.setNodeStatus(dn1.getDatanodeDetails(), - new NodeStatus(ENTERING_MAINTENANCE, HEALTHY)); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - nodeManager.setNodeStatus(dn1.getDatanodeDetails(), - new NodeStatus(IN_MAINTENANCE, HEALTHY)); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - nodeManager.setNodeStatus(dn1.getDatanodeDetails(), - new NodeStatus(IN_SERVICE, HEALTHY)); - nodeManager.setNodeStatus(dn3, new NodeStatus(DECOMMISSIONING, STALE)); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - nodeManager.setNodeStatus(dn3, new NodeStatus(DECOMMISSIONED, DEAD)); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - nodeManager.setNodeStatus(dn3, - new NodeStatus(ENTERING_MAINTENANCE, HEALTHY)); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - nodeManager.setNodeStatus(dn3, new NodeStatus(IN_MAINTENANCE, HEALTHY)); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY)); - - //container exists in target datanode - Assert.assertFalse(replicationManager.move(id, dn1.getDatanodeDetails(), - dn2.getDatanodeDetails()).isPresent()); - - //container does not exist in source datanode - Assert.assertFalse(replicationManager.move(id, dn3, - dn2.getDatanodeDetails()).isPresent()); - - //make container over relplicated to test the - // case that thatcontainer is in inflightDeletion - ContainerReplica dn5 = addReplica(container, - new NodeStatus(IN_SERVICE, HEALTHY), State.CLOSED); - ContainerReplica dn6 = addReplica(container, - new NodeStatus(IN_SERVICE, HEALTHY), State.CLOSED); - replicationManager.processContainersNow(); - //waiting for inflightDeletion generation - Thread.sleep(500); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); - - //clear inflight actions - resetReplicationManager(); - - //make the replica num be 2 to test the case - // that container is in inflightReplication - containerStateManager.removeContainerReplica(id, dn6); - containerStateManager.removeContainerReplica(id, dn5); - containerStateManager.removeContainerReplica(id, dn4); - //replication manager should generate inflightReplication - replicationManager.processContainersNow(); - //waiting for inflightReplication generation - Thread.sleep(500); - Assert.assertFalse(replicationManager.move(id, - dn1.getDatanodeDetails(), dn3).isPresent()); } private ContainerInfo createContainer(LifeCycleState containerState) From f94432e77dd3b3e9086fd753a42b8d8244921f19 Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Mon, 5 Jul 2021 21:15:51 +0800 Subject: [PATCH 03/12] add check for replicas + target - src --- .../scm/container/ReplicationManager.java | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 2fc2bd6e40f3..0565e724b24e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -183,6 +183,8 @@ enum MoveResult { // rack location) will not be satisfied, so we should not delete // the container DELETE_FAIL_POLICY, + // replicas + target - src does not satisfy placement policy + PLACEMENT_POLICY_NOT_SATISFIED, //unexpected action, remove src at inflightReplication UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION, //unexpected action, remove target at inflightDeletion @@ -652,6 +654,8 @@ public CompletableFuture move(ContainerID cid, * 4 the given container is in closed state * 5 the giver container is not taking any inflight action * 6 the given two datanodes are in IN_SERVICE state + * 7 {Existing replicas + Target_Dn - Source_Dn} satisfies + * the placement policy * * move is a combination of two steps : replication and deletion. * if the conditions above are all met, then we take a conservative @@ -689,8 +693,9 @@ public CompletableFuture move(ContainerID cid, // TODO: use a Read lock after introducing a RW lock into ContainerInfo ContainerInfo cif = containerManager.getContainer(cid); synchronized (cif) { - final Set replicas = containerManager - .getContainerReplicas(cid).stream() + final Set currentReplicas = containerManager + .getContainerReplicas(cid); + final Set replicas = currentReplicas.stream() .map(ContainerReplica::getDatanodeDetails) .collect(Collectors.toSet()); if (replicas.contains(targetDn)) { @@ -733,6 +738,21 @@ public CompletableFuture move(ContainerID cid, ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED); return ret; } + + // check whether {Existing replicas + Target_Dn - Source_Dn} + // satisfies current placement policy + Set movedReplicas = new HashSet<>(); + movedReplicas.addAll(currentReplicas); + movedReplicas.removeIf(r -> r.getDatanodeDetails().equals(srcDn)); + movedReplicas.add(ContainerReplica.newBuilder() + .setDatanodeDetails(targetDn).build()); + ContainerPlacementStatus placementStatus = getPlacementStatus( + movedReplicas, cif.getReplicationConfig().getRequiredNodes()); + if (!placementStatus.isPolicySatisfied()) { + ret.complete(MoveResult.PLACEMENT_POLICY_NOT_SATISFIED); + return ret; + } + inflightMove.putIfAbsent(cid, new ImmutablePair<>(srcDn, targetDn)); inflightMoveFuture.putIfAbsent(cid, ret); sendReplicateCommand(cif, targetDn, Collections.singletonList(srcDn)); From 3fec3d66f1d7f523c6e6b174753229621fd9669d Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Tue, 6 Jul 2021 14:21:14 +0800 Subject: [PATCH 04/12] fix a logic error --- .../hdds/scm/container/ReplicationManager.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 0565e724b24e..13ca28dbd8e8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -1178,17 +1178,17 @@ private void handleOverReplicatedContainer(final ContainerInfo container, } if (isTargetDnInReplicaSet) { if (isSourceDnInReplicaSet) { - // if the target is present, and source disappears somehow, - // we can consider move is successful. - inflightMoveFuture.get(id).complete(MoveResult.COMPLETED); - inflightMove.remove(id); - inflightMoveFuture.remove(id); - } else { // if the container is in inflightMove and target datanode is // included in the replicas, then swap the source datanode to // first of the replica list if exists, so the source datanode // will be first removed if possible. eligibleReplicas.add(0, eligibleReplicas.remove(sourceDnPos)); + } else { + // if the target is present, and source disappears somehow, + // we can consider move is successful. + inflightMoveFuture.get(id).complete(MoveResult.COMPLETED); + inflightMove.remove(id); + inflightMoveFuture.remove(id); } } else { if (isSourceDnInReplicaSet) { From 0d62fef549ef66f993039f6b8f53bd9c4a5bad90 Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Wed, 7 Jul 2021 18:12:43 +0800 Subject: [PATCH 05/12] add handleMoveIfNeeded --- .../scm/container/ReplicationManager.java | 144 +++++++++--------- 1 file changed, 75 insertions(+), 69 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 13ca28dbd8e8..4c6d3ef6dbc7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -155,8 +155,6 @@ enum MoveResult { COMPLETED, // RM is not running RM_NOT_RUNNING, - // source datanode disappear somehow - REPLICATION_FAIL_SOURCE_DISAPPEAR, // replication fail because the container does not exist in src REPLICATION_FAIL_NOT_EXIST_IN_SOURCE, // replication fail because the container exists in target @@ -741,14 +739,8 @@ public CompletableFuture move(ContainerID cid, // check whether {Existing replicas + Target_Dn - Source_Dn} // satisfies current placement policy - Set movedReplicas = new HashSet<>(); - movedReplicas.addAll(currentReplicas); - movedReplicas.removeIf(r -> r.getDatanodeDetails().equals(srcDn)); - movedReplicas.add(ContainerReplica.newBuilder() - .setDatanodeDetails(targetDn).build()); - ContainerPlacementStatus placementStatus = getPlacementStatus( - movedReplicas, cif.getReplicationConfig().getRequiredNodes()); - if (!placementStatus.isPolicySatisfied()) { + if (!isPolicySatisfiedAfterMove(cif, srcDn, targetDn, + currentReplicas.stream().collect(Collectors.toList()))) { ret.complete(MoveResult.PLACEMENT_POLICY_NOT_SATISFIED); return ret; } @@ -762,6 +754,28 @@ public CompletableFuture move(ContainerID cid, return ret; } + /** + * Returns whether {Existing replicas + Target_Dn - Source_Dn} + * satisfies current placement policy. + * @param cif Container Info of moved container + * @param srcDn DatanodeDetails of source data node + * @param targetDn DatanodeDetails of target data node + * @param replicas container replicas + * @return whether the placement policy is satisfied after move + */ + private boolean isPolicySatisfiedAfterMove(ContainerInfo cif, + DatanodeDetails srcDn, DatanodeDetails targetDn, + final List replicas){ + Set movedReplicas = + replicas.stream().collect(Collectors.toSet()); + movedReplicas.removeIf(r -> r.getDatanodeDetails().equals(srcDn)); + movedReplicas.add(ContainerReplica.newBuilder() + .setDatanodeDetails(targetDn).build()); + ContainerPlacementStatus placementStatus = getPlacementStatus( + movedReplicas, cif.getReplicationConfig().getRequiredNodes()); + return placementStatus.isPolicySatisfied(); + } + /** * Returns the number replica which are pending creation for the given * container ID. @@ -1153,78 +1167,70 @@ private void handleOverReplicatedContainer(final ContainerInfo container, break; } } - eligibleReplicas.removeAll(unhealthyReplicas); - boolean isInMove = inflightMove.containsKey(id); - boolean isSourceDnInReplicaSet = false; - boolean isTargetDnInReplicaSet = false; - - if (isInMove) { - Pair movePair = - inflightMove.get(id); - final DatanodeDetails sourceDN = movePair.getKey(); - isSourceDnInReplicaSet = eligibleReplicas.stream() - .anyMatch(r -> r.getDatanodeDetails().equals(sourceDN)); - isTargetDnInReplicaSet = eligibleReplicas.stream() - .anyMatch(r -> r.getDatanodeDetails() - .equals(movePair.getValue())); - int sourceDnPos = 0; - for (int i = 0; i < eligibleReplicas.size(); i++) { - if (eligibleReplicas.get(i).getDatanodeDetails() - .equals(sourceDN)) { - sourceDnPos = i; - break; - } - } - if (isTargetDnInReplicaSet) { - if (isSourceDnInReplicaSet) { - // if the container is in inflightMove and target datanode is - // included in the replicas, then swap the source datanode to - // first of the replica list if exists, so the source datanode - // will be first removed if possible. - eligibleReplicas.add(0, eligibleReplicas.remove(sourceDnPos)); - } else { - // if the target is present, and source disappears somehow, - // we can consider move is successful. - inflightMoveFuture.get(id).complete(MoveResult.COMPLETED); - inflightMove.remove(id); - inflightMoveFuture.remove(id); - } - } else { - if (isSourceDnInReplicaSet) { - // a container replica that being moved should not be removed. - // if the container is in inflightMove and target datanode is not - // included in the replicas, then swap the source datanode to the - // last of the replica list, so the source datanode will not - // be removed. - eligibleReplicas.add(eligibleReplicas.remove(sourceDnPos)); - } else { - // if the target is not present, and source disappears somehow, - // we fail the completeableFuture directly. - inflightMoveFuture.get(id).complete( - MoveResult.REPLICATION_FAIL_SOURCE_DISAPPEAR); - inflightMove.remove(id); - inflightMoveFuture.remove(id); - } - } - } + + excess -= handleMoveIfNeeded(container, eligibleReplicas); removeExcessReplicasIfNeeded(excess, container, eligibleReplicas); + } + } + + /** + * if the container is in inflightMove, handle move if needed. + * + * @param cif ContainerInfo + * @param eligibleReplicas An list of replicas, which may have excess replicas + * @return minus how many replica is removed through sending delete command + */ + private int handleMoveIfNeeded(final ContainerInfo cif, + final List eligibleReplicas) { + int minus = 0; + final ContainerID cid = cif.containerID(); + if (!inflightMove.containsKey(cid)) { + return minus; + } - if (isInMove && isSourceDnInReplicaSet && isTargetDnInReplicaSet) { + Pair movePair = + inflightMove.get(cid); + + final DatanodeDetails srcDn = movePair.getKey(); + final DatanodeDetails targetDn = movePair.getValue(); + boolean isSourceDnInReplicaSet; + boolean isTargetDnInReplicaSet; + + isSourceDnInReplicaSet = eligibleReplicas.stream() + .anyMatch(r -> r.getDatanodeDetails().equals(srcDn)); + isTargetDnInReplicaSet = eligibleReplicas.stream() + .anyMatch(r -> r.getDatanodeDetails().equals(targetDn)); + + // if target datanode is not in replica set , nothing to do + if (isTargetDnInReplicaSet) { + if (isSourceDnInReplicaSet && + isPolicySatisfiedAfterMove(cif, srcDn, targetDn, eligibleReplicas)) { + sendDeleteCommand(cif, srcDn, true); + eligibleReplicas.removeIf(r -> r.getDatanodeDetails().equals(srcDn)); + minus++; + } else if (!isSourceDnInReplicaSet){ + // if the target is present, and source disappears somehow, + // we can consider move is successful. + inflightMoveFuture.get(cif).complete(MoveResult.COMPLETED); + inflightMove.remove(cif); + inflightMoveFuture.remove(cif); + } else { // if source and target datanode are both in the replicaset, // but we can not delete source datanode for now (e.g., // there is only 3 replicas or not policy-statisfied , etc.), // we just complete the future without sending a delete command. LOG.info("can not remove source replica after successfully " + "replicated to target datanode"); - inflightMoveFuture.get(id).complete(MoveResult.DELETE_FAIL_POLICY); - inflightMove.remove(id); - inflightMoveFuture.remove(id); + inflightMoveFuture.get(cif).complete(MoveResult.DELETE_FAIL_POLICY); + inflightMove.remove(cif); + inflightMoveFuture.remove(cif); } } - } + return minus; + } /** * remove execess replicas if needed, replicationFactor and placement policy From ce29c94ebbd5d8833bf9a7a567802a93a17e9fad Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Wed, 7 Jul 2021 19:33:30 +0800 Subject: [PATCH 06/12] fix findbugs --- .../scm/container/ReplicationManager.java | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 02d66b3ce4f6..4c1bda620316 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -1207,23 +1207,24 @@ private int handleMoveIfNeeded(final ContainerInfo cif, sendDeleteCommand(cif, srcDn, true); eligibleReplicas.removeIf(r -> r.getDatanodeDetails().equals(srcDn)); minus++; - } else if (!isSourceDnInReplicaSet){ - // if the target is present, and source disappears somehow, - // we can consider move is successful. - inflightMoveFuture.get(cif).complete(MoveResult.COMPLETED); - inflightMove.remove(cif); - inflightMoveFuture.remove(cif); } else { - // if source and target datanode are both in the replicaset, - // but we can not delete source datanode for now (e.g., - // there is only 3 replicas or not policy-statisfied , etc.), - // we just complete the future without sending a delete command. - LOG.info("can not remove source replica after successfully " + - "replicated to target datanode"); - inflightMoveFuture.get(cif).complete(MoveResult.DELETE_FAIL_POLICY); - inflightMove.remove(cif); - inflightMoveFuture.remove(cif); + if (!isSourceDnInReplicaSet) { + // if the target is present, and source disappears somehow, + // we can consider move is successful. + inflightMoveFuture.get(cid).complete(MoveResult.COMPLETED); + } else { + // if source and target datanode are both in the replicaset, + // but we can not delete source datanode for now (e.g., + // there is only 3 replicas or not policy-statisfied , etc.), + // we just complete the future without sending a delete command. + LOG.info("can not remove source replica after successfully " + + "replicated to target datanode"); + inflightMoveFuture.get(cid).complete(MoveResult.DELETE_FAIL_POLICY); + } + inflightMove.remove(cid); + inflightMoveFuture.remove(cid); } + } return minus; From 0c5ca395e97df7e19bf2910ed54fd951e47737c8 Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Mon, 12 Jul 2021 15:16:46 +0800 Subject: [PATCH 07/12] update --- .../scm/container/ReplicationManager.java | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index ef320e00607e..ca94fb663cb9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -1203,8 +1203,18 @@ private int handleMoveIfNeeded(final ContainerInfo cif, // if target datanode is not in replica set , nothing to do if (isTargetDnInReplicaSet) { + Set eligibleReplicaSet = + eligibleReplicas.stream().collect(Collectors.toSet()); + int replicationFactor = + cif.getReplicationConfig().getRequiredNodes(); + ContainerPlacementStatus currentCPS = + getPlacementStatus(eligibleReplicaSet, replicationFactor); + eligibleReplicaSet.removeIf(r -> r.getDatanodeDetails().equals(srcDn)); + ContainerPlacementStatus afterMoveCPS = + getPlacementStatus(eligibleReplicaSet, replicationFactor); + if (isSourceDnInReplicaSet && - isPolicySatisfiedAfterMove(cif, srcDn, targetDn, eligibleReplicas)) { + isPlacementStatusActuallyEqual(currentCPS, afterMoveCPS)) { sendDeleteCommand(cif, srcDn, true); eligibleReplicas.removeIf(r -> r.getDatanodeDetails().equals(srcDn)); minus++; @@ -1225,7 +1235,6 @@ private int handleMoveIfNeeded(final ContainerInfo cif, inflightMove.remove(cid); inflightMoveFuture.remove(cid); } - } return minus; @@ -1263,9 +1272,7 @@ private void removeExcessReplicasIfNeeded(int excess, eligibleSet.remove(r); ContainerPlacementStatus nowPS = getPlacementStatus(eligibleSet, replicationFactor); - if ((!ps.isPolicySatisfied() - && nowPS.actualPlacementCount() == ps.actualPlacementCount()) - || (ps.isPolicySatisfied() && nowPS.isPolicySatisfied())) { + if (isPlacementStatusActuallyEqual(ps, nowPS)) { // Remove the replica if the container was already unsatisfied // and losing this replica keep actual placement count unchanged. // OR if losing this replica still keep satisfied @@ -1284,6 +1291,19 @@ private void removeExcessReplicasIfNeeded(int excess, } } + /** + * whether the given two ContainerPlacementStatus are actually equal. + * + * @param cps1 ContainerPlacementStatus + * @param cps2 ContainerPlacementStatus + */ + private boolean isPlacementStatusActuallyEqual( + ContainerPlacementStatus cps1, + ContainerPlacementStatus cps2) { + return cps1.actualPlacementCount() == cps2.actualPlacementCount() || + cps1.isPolicySatisfied() && cps2.isPolicySatisfied(); + } + /** * Given a set of ContainerReplica, transform it to a list of DatanodeDetails * and then check if the list meets the container placement policy. From 98abb100e1827a1ec2cd1261367385d5baea7292 Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Tue, 13 Jul 2021 14:49:15 +0800 Subject: [PATCH 08/12] add unit test for move --- .../scm/container/ReplicationManager.java | 9 +- .../scm/container/TestReplicationManager.java | 209 +++++++++++++++++- 2 files changed, 210 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index ca94fb663cb9..786c3f4d96ca 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -52,7 +52,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; import org.apache.hadoop.hdds.scm.ContainerPlacementStatus; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.events.SCMEvents; @@ -750,7 +751,7 @@ public CompletableFuture move(ContainerID cid, inflightMoveFuture.putIfAbsent(cid, ret); sendReplicateCommand(cif, targetDn, Collections.singletonList(srcDn)); } - LOG.info("receive a move requset about container {} , from {} to {}", + LOG.info("receive a move request about container {} , from {} to {}", cid, srcDn.getUuid(), targetDn.getUuid()); return ret; } @@ -771,7 +772,9 @@ private boolean isPolicySatisfiedAfterMove(ContainerInfo cif, replicas.stream().collect(Collectors.toSet()); movedReplicas.removeIf(r -> r.getDatanodeDetails().equals(srcDn)); movedReplicas.add(ContainerReplica.newBuilder() - .setDatanodeDetails(targetDn).build()); + .setDatanodeDetails(targetDn) + .setContainerID(cif.containerID()) + .setContainerState(State.CLOSED).build()); ContainerPlacementStatus placementStatus = getPlacementStatus( movedReplicas, cif.getReplicationConfig().getRequiredNodes()); return placementStatus.isPolicySatisfied(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java index 2975d21fa284..afb70c5d65bb 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; @@ -31,6 +32,7 @@ .ReplicationManagerConfiguration; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault; +import org.apache.hadoop.hdds.scm.container.ReplicationManager.MoveResult; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.SCMContext; @@ -57,6 +59,8 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -90,6 +94,7 @@ public class TestReplicationManager { private ContainerManagerV2 containerManager; private OzoneConfiguration conf; private SCMNodeManager scmNodeManager; + private GenericTestUtils.LogCapturer scmLogs; @Before public void setup() @@ -99,6 +104,7 @@ public void setup() HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, 0, TimeUnit.SECONDS); + scmLogs = GenericTestUtils.LogCapturer.captureLogs(ReplicationManager.LOG); containerManager = Mockito.mock(ContainerManagerV2.class); nodeManager = new SimpleMockNodeManager(); eventQueue = new EventQueue(); @@ -163,6 +169,7 @@ public void setup() nodeManager); serviceManager.notifyStatusChanged(); + scmLogs.clearOutput(); Thread.sleep(100L); } @@ -1095,14 +1102,199 @@ public void testUnderReplicatedNotHealthySource() } /** - * before Replication Manager generates a completablefuture for a move option, - * some Prerequisites should be met. + * if all the prerequisites are satisfied, move should work as expected. */ + @Test + public void testMove() throws SCMException, NodeNotFoundException, + InterruptedException, ExecutionException { + final ContainerInfo container = createContainer(LifeCycleState.CLOSED); + ContainerID id = container.containerID(); + ContainerReplica dn1 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY)); + CompletableFuture cf = + replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + Assert.assertTrue(scmLogs.getOutput().contains( + "receive a move request about container")); + Assert.assertTrue(datanodeCommandHandler.received( + SCMCommandProto.Type.replicateContainerCommand, dn3)); + Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount( + SCMCommandProto.Type.replicateContainerCommand)); + + //replicate container to dn3 + addReplicaToDn(container, dn3, CLOSED); + replicationManager.processContainersNow(); + Thread.sleep(100L); + + Assert.assertTrue(datanodeCommandHandler.received( + SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails())); + Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount( + SCMCommandProto.Type.deleteContainerCommand)); + containerStateManager.removeContainerReplica(id, dn1); + + replicationManager.processContainersNow(); + Thread.sleep(100L); + + Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.COMPLETED); + } + /** + * test src and target datanode become unhealthy when moving. + */ + @Test + public void testDnBecameUnhealthyWhenMoving() throws SCMException, + NodeNotFoundException, InterruptedException, ExecutionException { + final ContainerInfo container = createContainer(LifeCycleState.CLOSED); + ContainerID id = container.containerID(); + ContainerReplica dn1 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY)); + CompletableFuture cf = + replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + Assert.assertTrue(scmLogs.getOutput().contains( + "receive a move request about container")); + + nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, STALE)); + replicationManager.processContainersNow(); + Thread.sleep(100L); + + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); + + nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY)); + cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + addReplicaToDn(container, dn3, CLOSED); + replicationManager.processContainersNow(); + Thread.sleep(100L); + nodeManager.setNodeStatus(dn1.getDatanodeDetails(), + new NodeStatus(IN_SERVICE, STALE)); + replicationManager.processContainersNow(); + Thread.sleep(100L); + + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.DELETION_FAIL_NODE_UNHEALTHY); + } + + /** + * before Replication Manager generates a completablefuture for a move option, + * some Prerequisites should be satisfied. + */ @Test public void testMovePrerequisites() - throws SCMException, ContainerNotFoundException, - NodeNotFoundException, InterruptedException { + throws SCMException, NodeNotFoundException, + InterruptedException, ExecutionException { + //all conditions is met + final ContainerInfo container = createContainer(LifeCycleState.CLOSED); + ContainerID id = container.containerID(); + ContainerReplica dn1 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + ContainerReplica dn2 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY)); + ContainerReplica dn4 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + + CompletableFuture cf; + //the above move is executed successfully, so there may be some item in + //inflightReplication or inflightDeletion. here we stop replication manager + //to clear these states, which may impact the tests below. + //we don't need a running replicationManamger now + replicationManager.stop(); + Thread.sleep(100L); + cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.RM_NOT_RUNNING); + replicationManager.start(); + Thread.sleep(100L); + + //container in not in CLOSED state + for (LifeCycleState state : LifeCycleState.values()) { + if (state != LifeCycleState.CLOSED) { + container.setState(state); + cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED); + } + } + container.setState(LifeCycleState.CLOSED); + + //Node is not in healthy state + for (HddsProtos.NodeState state : HddsProtos.NodeState.values()) { + if (state != HEALTHY) { + nodeManager.setNodeStatus(dn3, + new NodeStatus(IN_SERVICE, state)); + cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); + cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails()); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); + } + } + nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY)); + + //Node is not in IN_SERVICE state + for (HddsProtos.NodeOperationalState state : + HddsProtos.NodeOperationalState.values()) { + if (state != IN_SERVICE) { + nodeManager.setNodeStatus(dn3, + new NodeStatus(state, HEALTHY)); + cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE); + cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails()); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE); + } + } + nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY)); + + //container exists in target datanode + cf = replicationManager.move(id, dn1.getDatanodeDetails(), + dn2.getDatanodeDetails()); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET); + + //container does not exist in source datanode + cf = replicationManager.move(id, dn3, dn3); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE); + + replicationManager.start(); + //make container over relplicated to test the + // case that container is in inflightDeletion + ContainerReplica dn5 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), State.CLOSED); + ContainerReplica dn6 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), State.CLOSED); + replicationManager.processContainersNow(); + //waiting for inflightDeletion generation + Thread.sleep(100L); + cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION); + resetReplicationManager(); + + //make the replica num be 2 to test the case + //that container is in inflightReplication + containerStateManager.removeContainerReplica(id, dn6); + containerStateManager.removeContainerReplica(id, dn5); + containerStateManager.removeContainerReplica(id, dn4); + //replication manager should generate inflightReplication + replicationManager.processContainersNow(); + //waiting for inflightReplication generation + Thread.sleep(100L); + cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION); } private ContainerInfo createContainer(LifeCycleState containerState) @@ -1123,14 +1315,21 @@ private DatanodeDetails addNode(NodeStatus nodeStatus) { private void resetReplicationManager() throws InterruptedException { replicationManager.stop(); - Thread.sleep(500); + Thread.sleep(100L); replicationManager.start(); + Thread.sleep(100L); } private ContainerReplica addReplica(ContainerInfo container, NodeStatus nodeStatus, State replicaState) throws ContainerNotFoundException { DatanodeDetails dn = addNode(nodeStatus); + return addReplicaToDn(container, dn, replicaState); + } + + private ContainerReplica addReplicaToDn(ContainerInfo container, + DatanodeDetails dn, State replicaState) + throws ContainerNotFoundException { // Using the same originID for all replica in the container set. If each // replica has a unique originID, it causes problems in ReplicationManager // when processing over-replicated containers. From 7b659632ccb7d2d495d4c3a7d8a5891efddc1cfa Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Tue, 13 Jul 2021 15:17:39 +0800 Subject: [PATCH 09/12] triger CI for unit test From c08502f486a0bc4fb879598f40a554091f3ca3e3 Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Fri, 16 Jul 2021 17:25:36 +0800 Subject: [PATCH 10/12] add UT and change hanldMove --- .../scm/container/ReplicationManager.java | 105 ++++++++---------- .../scm/container/TestReplicationManager.java | 41 +++++++ 2 files changed, 87 insertions(+), 59 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 786c3f4d96ca..a40dc2d0a09f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -529,9 +529,9 @@ private void updateInflightAction(final ContainerInfo container, if (isCompleted || isUnhealthy || isTimeout || isNotInService) { iter.remove(); updateMoveIfNeeded(isUnhealthy, isCompleted, isTimeout, - id, a.datanode, inflightActions); + container, a.datanode, inflightActions); } - } catch (NodeNotFoundException e) { + } catch (NodeNotFoundException | ContainerNotFoundException e) { // Should not happen, but if it does, just remove the action as the // node somehow does not exist; iter.remove(); @@ -549,16 +549,18 @@ private void updateInflightAction(final ContainerInfo container, * @param isUnhealthy is the datanode unhealthy * @param isCompleted is the action completed * @param isTimeout is the action timeout - * @param id Container to update + * @param container Container to update * @param dn datanode which is removed from the inflightActions * @param inflightActions inflightReplication (or) inflightDeletion */ private void updateMoveIfNeeded(final boolean isUnhealthy, final boolean isCompleted, final boolean isTimeout, - final ContainerID id, final DatanodeDetails dn, + final ContainerInfo container, final DatanodeDetails dn, final Map> inflightActions) { + List> inflightActions) + throws ContainerNotFoundException { // make sure inflightMove contains the container + ContainerID id = container.containerID(); if (!inflightMove.containsKey(id)) { return; } @@ -603,7 +605,6 @@ private void updateMoveIfNeeded(final boolean isUnhealthy, return; } - //if replication is completed , nothing to do if (!(isInflightReplication && isCompleted)) { if (isInflightReplication) { if (isUnhealthy) { @@ -627,6 +628,9 @@ private void updateMoveIfNeeded(final boolean isUnhealthy, } inflightMove.remove(id); inflightMoveFuture.remove(id); + } else { + handleMove(container, + containerManager.getContainerReplicas(id)); } } @@ -1169,78 +1173,61 @@ private void handleOverReplicatedContainer(final ContainerInfo container, } } eligibleReplicas.removeAll(unhealthyReplicas); - - excess -= handleMoveIfNeeded(container, eligibleReplicas); - removeExcessReplicasIfNeeded(excess, container, eligibleReplicas); } } /** - * if the container is in inflightMove, handle move if needed. + * if the container is in inflightMove, handle move. * * @param cif ContainerInfo - * @param eligibleReplicas An list of replicas, which may have excess replicas - * @return minus how many replica is removed through sending delete command + * @param replicaSet An Set of replicas, which may have excess replicas */ - private int handleMoveIfNeeded(final ContainerInfo cif, - final List eligibleReplicas) { - int minus = 0; + private void handleMove(final ContainerInfo cif, + final Set replicaSet) { final ContainerID cid = cif.containerID(); - if (!inflightMove.containsKey(cid)) { - return minus; - } - - Pair movePair = - inflightMove.get(cid); - - final DatanodeDetails srcDn = movePair.getKey(); - final DatanodeDetails targetDn = movePair.getValue(); - boolean isSourceDnInReplicaSet; - boolean isTargetDnInReplicaSet; - - isSourceDnInReplicaSet = eligibleReplicas.stream() - .anyMatch(r -> r.getDatanodeDetails().equals(srcDn)); - isTargetDnInReplicaSet = eligibleReplicas.stream() - .anyMatch(r -> r.getDatanodeDetails().equals(targetDn)); + if (inflightMove.containsKey(cid)) { + Pair movePair = + inflightMove.get(cid); + final DatanodeDetails srcDn = movePair.getKey(); + ContainerReplicaCount replicaCount = + getContainerReplicaCount(cif, replicaSet); + + if(!replicaSet.stream() + .anyMatch(r -> r.getDatanodeDetails().equals(srcDn))){ + // if the target is present but source disappears somehow, + // we can consider move is successful. + inflightMoveFuture.get(cid).complete(MoveResult.COMPLETED); + inflightMove.remove(cid); + inflightMoveFuture.remove(cid); + return; + } - // if target datanode is not in replica set , nothing to do - if (isTargetDnInReplicaSet) { - Set eligibleReplicaSet = - eligibleReplicas.stream().collect(Collectors.toSet()); int replicationFactor = cif.getReplicationConfig().getRequiredNodes(); ContainerPlacementStatus currentCPS = - getPlacementStatus(eligibleReplicaSet, replicationFactor); - eligibleReplicaSet.removeIf(r -> r.getDatanodeDetails().equals(srcDn)); - ContainerPlacementStatus afterMoveCPS = - getPlacementStatus(eligibleReplicaSet, replicationFactor); - - if (isSourceDnInReplicaSet && - isPlacementStatusActuallyEqual(currentCPS, afterMoveCPS)) { + getPlacementStatus(replicaSet, replicationFactor); + Set newReplicaSet = replicaSet. + stream().collect(Collectors.toSet()); + newReplicaSet.removeIf(r -> r.getDatanodeDetails().equals(srcDn)); + ContainerPlacementStatus newCPS = + getPlacementStatus(newReplicaSet, replicationFactor); + + if (replicaCount.isOverReplicated() && + isPlacementStatusActuallyEqual(currentCPS, newCPS)) { sendDeleteCommand(cif, srcDn, true); - eligibleReplicas.removeIf(r -> r.getDatanodeDetails().equals(srcDn)); - minus++; } else { - if (!isSourceDnInReplicaSet) { - // if the target is present, and source disappears somehow, - // we can consider move is successful. - inflightMoveFuture.get(cid).complete(MoveResult.COMPLETED); - } else { - // if source and target datanode are both in the replicaset, - // but we can not delete source datanode for now (e.g., - // there is only 3 replicas or not policy-statisfied , etc.), - // we just complete the future without sending a delete command. - LOG.info("can not remove source replica after successfully " + - "replicated to target datanode"); - inflightMoveFuture.get(cid).complete(MoveResult.DELETE_FAIL_POLICY); - } + // if source and target datanode are both in the replicaset, + // but we can not delete source datanode for now (e.g., + // there is only 3 replicas or not policy-statisfied , etc.), + // we just complete the future without sending a delete command. + LOG.info("can not remove source replica after successfully " + + "replicated to target datanode"); + inflightMoveFuture.get(cid).complete(MoveResult.DELETE_FAIL_POLICY); inflightMove.remove(cid); inflightMoveFuture.remove(cid); } } - - return minus; } /** diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java index afb70c5d65bb..be08cd35d273 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -1142,6 +1142,47 @@ public void testMove() throws SCMException, NodeNotFoundException, Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.COMPLETED); } + /** + * make sure RM does not delete replica if placement policy is not satisfied. + */ + @Test + public void testMoveNotDeleteSrcIfPolicyNotSatisfied() + throws SCMException, NodeNotFoundException, + InterruptedException, ExecutionException { + final ContainerInfo container = createContainer(LifeCycleState.CLOSED); + ContainerID id = container.containerID(); + ContainerReplica dn1 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + ContainerReplica dn2 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + DatanodeDetails dn4 = addNode(new NodeStatus(IN_SERVICE, HEALTHY)); + CompletableFuture cf = + replicationManager.move(id, dn1.getDatanodeDetails(), dn4); + Assert.assertTrue(scmLogs.getOutput().contains( + "receive a move request about container")); + Assert.assertTrue(datanodeCommandHandler.received( + SCMCommandProto.Type.replicateContainerCommand, dn4)); + Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount( + SCMCommandProto.Type.replicateContainerCommand)); + + //replicate container to dn4 + addReplicaToDn(container, dn4, CLOSED); + //now, replication succeeds, but replica in dn2 lost, + //and there are only tree replicas totally, so rm should + //not delete the replica on dn1 + containerStateManager.removeContainerReplica(id, dn2); + replicationManager.processContainersNow(); + Thread.sleep(100L); + + Assert.assertFalse(datanodeCommandHandler.received( + SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails())); + + Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.DELETE_FAIL_POLICY); + } + + /** * test src and target datanode become unhealthy when moving. */ From b6147be847af4843d731a603b955b2bc9d0f2fa2 Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Fri, 16 Jul 2021 19:26:58 +0800 Subject: [PATCH 11/12] update --- .../apache/hadoop/hdds/scm/container/ReplicationManager.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index a40dc2d0a09f..045a0e899557 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -629,7 +629,7 @@ private void updateMoveIfNeeded(final boolean isUnhealthy, inflightMove.remove(id); inflightMoveFuture.remove(id); } else { - handleMove(container, + deleteSrcDnForMove(container, containerManager.getContainerReplicas(id)); } } @@ -1179,11 +1179,12 @@ private void handleOverReplicatedContainer(final ContainerInfo container, /** * if the container is in inflightMove, handle move. + * This function assumes replication has been completed * * @param cif ContainerInfo * @param replicaSet An Set of replicas, which may have excess replicas */ - private void handleMove(final ContainerInfo cif, + private void deleteSrcDnForMove(final ContainerInfo cif, final Set replicaSet) { final ContainerID cid = cif.containerID(); if (inflightMove.containsKey(cid)) { From cccca4d0b584fdd88792c8733c6c7b51cac44869 Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Fri, 16 Jul 2021 20:13:30 +0800 Subject: [PATCH 12/12] add wait time for CI --- .../hadoop/hdds/scm/container/TestReplicationManager.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java index f514915d699c..4528e565d8c6 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -1120,6 +1120,7 @@ public void testMove() throws SCMException, NodeNotFoundException, replicationManager.move(id, dn1.getDatanodeDetails(), dn3); Assert.assertTrue(scmLogs.getOutput().contains( "receive a move request about container")); + Thread.sleep(100L); Assert.assertTrue(datanodeCommandHandler.received( SCMCommandProto.Type.replicateContainerCommand, dn3)); Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount( @@ -1162,6 +1163,7 @@ public void testMoveNotDeleteSrcIfPolicyNotSatisfied() replicationManager.move(id, dn1.getDatanodeDetails(), dn4); Assert.assertTrue(scmLogs.getOutput().contains( "receive a move request about container")); + Thread.sleep(100L); Assert.assertTrue(datanodeCommandHandler.received( SCMCommandProto.Type.replicateContainerCommand, dn4)); Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount(