From 5a306c104830b6f78ba5cb7e6404cf74bb1cbbfb Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Wed, 5 Aug 2020 11:37:28 +0100 Subject: [PATCH 1/3] Initial changes to implement feature --- .../scm/pipeline/PipelinePlacementPolicy.java | 33 ++++++++++++++ .../hdds/scm/container/MockNodeManager.java | 13 ++++++ .../pipeline/TestPipelinePlacementPolicy.java | 45 +++++++++++++++++++ 3 files changed, 91 insertions(+) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java index 524b5ec8b216..5f3a52d26424 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java @@ -163,9 +163,42 @@ List filterViableNodes( throw new SCMException(msg, SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); } + + if (!checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) { + boolean multipleRacks = multipleRacksAvailable(healthyNodes); + boolean multipleRacksAfterFilter = multipleRacksAvailable(healthyList); + if (multipleRacks && !multipleRacksAfterFilter) { + msg = "The cluster has multiple racks, but all nodes with available " + + "pipeline capacity are on a single rack. There are insufficient " + + "cross rack nodes available to create a pipeline"; + LOG.debug(msg); + throw new SCMException(msg, + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + } return healthyList; } + /** + * Given a list of Datanodes, return false if the entire list is only on a + * single rack, or the list is empty. If there is more than 1 rack, return + * true. + * @param dns List of datanodes to check + * @return True if there are multiple racks, false otherwise + */ + private boolean multipleRacksAvailable(List dns) { + if (dns.size() <= 1) { + return false; + } + String initialRack = dns.get(0).getNetworkLocation(); + for (DatanodeDetails dn : dns) { + if (!dn.getNetworkLocation().equals(initialRack)) { + return true; + } + } + return false; + } + /** * Pipeline placement choose datanodes to join the pipeline. * diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 5b635a7bee94..4b8b37dee273 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -390,6 +390,19 @@ public void clearCommandQueue(UUID dnId) { } } + public void setNodeState(DatanodeDetails dn, HddsProtos.NodeState state) { + healthyNodes.remove(dn); + staleNodes.remove(dn); + deadNodes.remove(dn); + if (state == HEALTHY) { + healthyNodes.add(dn); + } else if (state == STALE) { + staleNodes.add(dn); + } else { + deadNodes.add(dn); + } + } + /** * Closes this stream and releases any system resources associated with it. If * the stream is already closed then invoking this method has no effect. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java index 8d6a28cc2a0d..59350569dae4 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -439,6 +440,50 @@ public void testValidatePlacementPolicySingleRackInCluster() { assertEquals(0, status.misReplicationCount()); } + @Test + public void testPreventNonRackAwarePipelinesWithSkewedRacks() + throws Exception { + cluster = initTopology(); + + List dns = new ArrayList<>(); + dns.add(MockDatanodeDetails + .createDatanodeDetails("host1", "/rack1")); + dns.add(MockDatanodeDetails + .createDatanodeDetails("host2", "/rack2")); + dns.add(MockDatanodeDetails + .createDatanodeDetails("host3", "/rack2")); + dns.add(MockDatanodeDetails + .createDatanodeDetails("host4", "/rack2")); + + nodeManager = new MockNodeManager(cluster, dns, + false, PIPELINE_PLACEMENT_MAX_NODES_COUNT); + placementPolicy = new PipelinePlacementPolicy( + nodeManager, stateManager, conf); + + // Set the first load to its pipeline limit. This means there are only + // 3 hosts on a single rack available for new pipelines + insertHeavyNodesIntoNodeManager(dns, 1); + + int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber(); + + LambdaTestUtils.intercept(SCMException.class, + "The cluster has multiple racks, but all nodes with " + + "available pipeline capacity are on a single rack.", + () -> placementPolicy.chooseDatanodes( + new ArrayList<>(), new ArrayList<>(), nodesRequired, 0)); + // Set the only node on rack1 stale, meaning we only have 1 rack alive now + nodeManager.setNodeState(dns.get(0), HddsProtos.NodeState.STALE); + + // As there is only 1 rack alive, the 3 DNs on /rack2 should be returned + List pickedDns = placementPolicy.chooseDatanodes( + new ArrayList<>(), new ArrayList<>(), nodesRequired, 0); + + assertEquals(3, pickedDns.size()); + assertTrue(pickedDns.contains(dns.get(1))); + assertTrue(pickedDns.contains(dns.get(2))); + assertTrue(pickedDns.contains(dns.get(3))); + } + private boolean checkDuplicateNodesUUID(List nodes) { HashSet uuids = nodes.stream(). map(DatanodeDetails::getUuid). From ca74ee6feb2e316ac73d2a8117eb9b47619606b3 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Fri, 14 Aug 2020 10:24:45 +0100 Subject: [PATCH 2/3] Refactor tests based on review comments --- .../scm/pipeline/PipelinePlacementPolicy.java | 12 +++-- .../pipeline/TestPipelinePlacementPolicy.java | 53 ++++++++++++++----- 2 files changed, 47 insertions(+), 18 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java index 5f3a52d26424..a61344e367fd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java @@ -56,6 +56,11 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { private final int heavyNodeCriteria; private static final int REQUIRED_RACKS = 2; + public static final String MULTIPLE_RACK_PIPELINE_MSG = + "The cluster has multiple racks, but all nodes with available " + + "pipeline capacity are on a single rack. There are insufficient " + + "cross rack nodes available to create a pipeline"; + /** * Constructs a pipeline placement with considering network topology, * load balancing and rack awareness. @@ -168,11 +173,8 @@ List filterViableNodes( boolean multipleRacks = multipleRacksAvailable(healthyNodes); boolean multipleRacksAfterFilter = multipleRacksAvailable(healthyList); if (multipleRacks && !multipleRacksAfterFilter) { - msg = "The cluster has multiple racks, but all nodes with available " + - "pipeline capacity are on a single rack. There are insufficient " + - "cross rack nodes available to create a pipeline"; - LOG.debug(msg); - throw new SCMException(msg, + LOG.debug(MULTIPLE_RACK_PIPELINE_MSG); + throw new SCMException(MULTIPLE_RACK_PIPELINE_MSG, SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java index 59350569dae4..a8ef17ec6693 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java @@ -42,10 +42,11 @@ import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; -import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -441,7 +442,7 @@ public void testValidatePlacementPolicySingleRackInCluster() { } @Test - public void testPreventNonRackAwarePipelinesWithSkewedRacks() + public void test3NodesInSameRackReturnedWhenOnlyOneHealthyRackIsPresent() throws Exception { cluster = initTopology(); @@ -460,18 +461,9 @@ public void testPreventNonRackAwarePipelinesWithSkewedRacks() placementPolicy = new PipelinePlacementPolicy( nodeManager, stateManager, conf); - // Set the first load to its pipeline limit. This means there are only - // 3 hosts on a single rack available for new pipelines - insertHeavyNodesIntoNodeManager(dns, 1); - int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber(); - - LambdaTestUtils.intercept(SCMException.class, - "The cluster has multiple racks, but all nodes with " + - "available pipeline capacity are on a single rack.", - () -> placementPolicy.chooseDatanodes( - new ArrayList<>(), new ArrayList<>(), nodesRequired, 0)); - // Set the only node on rack1 stale, meaning we only have 1 rack alive now + // Set the only node on rack1 stale. This makes the cluster effectively a + // single rack. nodeManager.setNodeState(dns.get(0), HddsProtos.NodeState.STALE); // As there is only 1 rack alive, the 3 DNs on /rack2 should be returned @@ -484,6 +476,41 @@ public void testPreventNonRackAwarePipelinesWithSkewedRacks() assertTrue(pickedDns.contains(dns.get(3))); } + @Rule + public ExpectedException thrownExp = ExpectedException.none(); + + @Test + public void testExceptionIsThrownWhenRackAwarePipelineCanNotBeCreated() + throws Exception { + thrownExp.expect(SCMException.class); + thrownExp.expectMessage(PipelinePlacementPolicy.MULTIPLE_RACK_PIPELINE_MSG); + + cluster = initTopology(); + + List dns = new ArrayList<>(); + dns.add(MockDatanodeDetails + .createDatanodeDetails("host1", "/rack1")); + dns.add(MockDatanodeDetails + .createDatanodeDetails("host2", "/rack2")); + dns.add(MockDatanodeDetails + .createDatanodeDetails("host3", "/rack2")); + dns.add(MockDatanodeDetails + .createDatanodeDetails("host4", "/rack2")); + + nodeManager = new MockNodeManager(cluster, dns, + false, PIPELINE_PLACEMENT_MAX_NODES_COUNT); + placementPolicy = new PipelinePlacementPolicy( + nodeManager, stateManager, conf); + + // Set the first node to its pipeline limit. This means there are only + // 3 hosts on a single rack available for new pipelines + insertHeavyNodesIntoNodeManager(dns, 1); + int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber(); + + placementPolicy.chooseDatanodes( + new ArrayList<>(), new ArrayList<>(), nodesRequired, 0); + } + private boolean checkDuplicateNodesUUID(List nodes) { HashSet uuids = nodes.stream(). map(DatanodeDetails::getUuid). From a80d91f1e93ef14bb4571833097c100cf7307ed3 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Mon, 17 Aug 2020 11:45:07 +0100 Subject: [PATCH 3/3] Handle excluded nodes correctly --- .../scm/pipeline/PipelinePlacementPolicy.java | 2 +- .../pipeline/TestPipelinePlacementPolicy.java | 57 +++++++++++-------- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java index a61344e367fd..84efdc2e1403 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java @@ -125,6 +125,7 @@ List filterViableNodes( // get nodes in HEALTHY state List healthyNodes = nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); + boolean multipleRacks = multipleRacksAvailable(healthyNodes); if (excludedNodes != null) { healthyNodes.removeAll(excludedNodes); } @@ -170,7 +171,6 @@ List filterViableNodes( } if (!checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) { - boolean multipleRacks = multipleRacksAvailable(healthyNodes); boolean multipleRacksAfterFilter = multipleRacksAvailable(healthyList); if (multipleRacks && !multipleRacksAfterFilter) { LOG.debug(MULTIPLE_RACK_PIPELINE_MSG); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java index a8ef17ec6693..1274608c39c2 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java @@ -444,22 +444,7 @@ public void testValidatePlacementPolicySingleRackInCluster() { @Test public void test3NodesInSameRackReturnedWhenOnlyOneHealthyRackIsPresent() throws Exception { - cluster = initTopology(); - - List dns = new ArrayList<>(); - dns.add(MockDatanodeDetails - .createDatanodeDetails("host1", "/rack1")); - dns.add(MockDatanodeDetails - .createDatanodeDetails("host2", "/rack2")); - dns.add(MockDatanodeDetails - .createDatanodeDetails("host3", "/rack2")); - dns.add(MockDatanodeDetails - .createDatanodeDetails("host4", "/rack2")); - - nodeManager = new MockNodeManager(cluster, dns, - false, PIPELINE_PLACEMENT_MAX_NODES_COUNT); - placementPolicy = new PipelinePlacementPolicy( - nodeManager, stateManager, conf); + List dns = setupSkewedRacks(); int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber(); // Set the only node on rack1 stale. This makes the cluster effectively a @@ -485,6 +470,37 @@ public void testExceptionIsThrownWhenRackAwarePipelineCanNotBeCreated() thrownExp.expect(SCMException.class); thrownExp.expectMessage(PipelinePlacementPolicy.MULTIPLE_RACK_PIPELINE_MSG); + List dns = setupSkewedRacks(); + + // Set the first node to its pipeline limit. This means there are only + // 3 hosts on a single rack available for new pipelines + insertHeavyNodesIntoNodeManager(dns, 1); + int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber(); + + placementPolicy.chooseDatanodes( + new ArrayList<>(), new ArrayList<>(), nodesRequired, 0); + } + + @Test + public void testExceptionThrownRackAwarePipelineCanNotBeCreatedExcludedNode() + throws Exception { + thrownExp.expect(SCMException.class); + thrownExp.expectMessage(PipelinePlacementPolicy.MULTIPLE_RACK_PIPELINE_MSG); + + List dns = setupSkewedRacks(); + + // Set the first node to its pipeline limit. This means there are only + // 3 hosts on a single rack available for new pipelines + insertHeavyNodesIntoNodeManager(dns, 1); + int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber(); + + List excluded = new ArrayList<>(); + excluded.add(dns.get(0)); + placementPolicy.chooseDatanodes( + excluded, new ArrayList<>(), nodesRequired, 0); + } + + private List setupSkewedRacks() { cluster = initTopology(); List dns = new ArrayList<>(); @@ -501,14 +517,7 @@ public void testExceptionIsThrownWhenRackAwarePipelineCanNotBeCreated() false, PIPELINE_PLACEMENT_MAX_NODES_COUNT); placementPolicy = new PipelinePlacementPolicy( nodeManager, stateManager, conf); - - // Set the first node to its pipeline limit. This means there are only - // 3 hosts on a single rack available for new pipelines - insertHeavyNodesIntoNodeManager(dns, 1); - int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber(); - - placementPolicy.chooseDatanodes( - new ArrayList<>(), new ArrayList<>(), nodesRequired, 0); + return dns; } private boolean checkDuplicateNodesUUID(List nodes) {