From 127a6a35f7e224454b1fdc12f79713f4a1310e1a Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Wed, 2 Apr 2025 13:08:25 +0530 Subject: [PATCH 1/5] HDDS-12624. Fix pipeline limit check to prevent incorrect pipeline creation --- .../scm/pipeline/RatisPipelineProvider.java | 17 +++++++----- .../pipeline/TestRatisPipelineProvider.java | 27 +++++++++++++++++++ 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index c95f7208b80c..8538abac9b7e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -111,7 +111,7 @@ private boolean exceedPipelineNumberLimit( if (maxPipelinePerDatanode > 0) { return (getPipelineStateManager().getPipelines(replicationConfig).size() - getPipelineStateManager().getPipelines(replicationConfig, - PipelineState.CLOSED).size()) > maxPipelinePerDatanode * + PipelineState.CLOSED).size()) >= maxPipelinePerDatanode * getNodeManager().getNodeCount(NodeStatus.inServiceHealthy()) / replicationConfig.getRequiredNodes(); } @@ -120,7 +120,7 @@ private boolean exceedPipelineNumberLimit( if (pipelineNumberLimit > 0) { return (getPipelineStateManager().getPipelines(replicationConfig).size() - getPipelineStateManager().getPipelines( - replicationConfig, PipelineState.CLOSED).size()) > + replicationConfig, PipelineState.CLOSED).size()) >= (pipelineNumberLimit - getPipelineStateManager() .getPipelines(RatisReplicationConfig .getInstance(ReplicationFactor.ONE)) @@ -147,10 +147,15 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig, List excludedNodes, List favoredNodes) throws IOException { if (exceedPipelineNumberLimit(replicationConfig)) { - throw new SCMException("Ratis pipeline number meets the limit: " + - pipelineNumberLimit + " replicationConfig : " + - replicationConfig, - SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + String limitInfo = (maxPipelinePerDatanode > 0) + ? String.format("per dataNode: %d", maxPipelinePerDatanode) + : String.format(": %d", pipelineNumberLimit); + + throw new SCMException( + String.format("Ratis pipeline number meets the limit %s replicationConfig: %s", + limitInfo, replicationConfig), + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE + ); } List dns; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java index 96ba75507088..44168fe56c66 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java @@ -64,6 +64,8 @@ import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; /** * Test for {@link RatisPipelineProvider}. @@ -361,6 +363,31 @@ public void testCreatePipelinesWhenNotEnoughSpace(@TempDir File tempDir) throws } } + @ParameterizedTest + @CsvSource({ "1, 2", "2, 5" }) + public void testCreatePipelineThrowErrorWithDataNodeLimit(int limit, int pipelineCount) throws Exception { + init(limit); + + // Create pipelines up to the limit (2 for limit=1, 5 for limit=2). + for (int i = 0; i < pipelineCount; i++) { + stateManager.addPipeline( + provider.create(RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + new ArrayList<>(), new ArrayList<>()).getProtobufMessage(ClientVersion.CURRENT_VERSION) + ); + } + + // Verify that creating an additional pipeline throws an exception. + SCMException exception = assertThrows(SCMException.class, () -> + provider.create(RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + new ArrayList<>(), new ArrayList<>()) + ); + + // Validate exception message. + String expectedError = String.format( + "Ratis pipeline number meets the limit per dataNode: %d replicationConfig: RATIS/THREE", limit); + assertEquals(expectedError, exception.getMessage()); + } + private void addPipeline( List dns, Pipeline.PipelineState open, ReplicationConfig replicationConfig) From f6a86f9c26f46b9ad082e566bbc0a01d34807fa5 Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Wed, 2 Apr 2025 16:24:59 +0530 Subject: [PATCH 2/5] HDDS-12624. Fixing testcase and adressed review comments --- .../apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java | 2 +- .../hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java | 2 +- .../hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 8538abac9b7e..f5a4cda0e5ca 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -148,7 +148,7 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig, throws IOException { if (exceedPipelineNumberLimit(replicationConfig)) { String limitInfo = (maxPipelinePerDatanode > 0) - ? String.format("per dataNode: %d", maxPipelinePerDatanode) + ? String.format("per datanode: %d", maxPipelinePerDatanode) : String.format(": %d", pipelineNumberLimit); throw new SCMException( diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java index 44168fe56c66..a13354b70065 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java @@ -384,7 +384,7 @@ public void testCreatePipelineThrowErrorWithDataNodeLimit(int limit, int pipelin // Validate exception message. String expectedError = String.format( - "Ratis pipeline number meets the limit per dataNode: %d replicationConfig: RATIS/THREE", limit); + "Ratis pipeline number meets the limit per datanode: %d replicationConfig: RATIS/THREE", limit); assertEquals(expectedError, exception.getMessage()); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java index cad08856281b..699c03c6c941 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java @@ -145,7 +145,7 @@ public void testPipelineCreationOnNodeRestart() throws Exception { pipelineManager.createPipeline(RatisReplicationConfig.getInstance( ReplicationFactor.THREE)), "pipeline creation should fail after shutting down pipeline"); - assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES, ioe.getResult()); + assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE, ioe.getResult()); // make sure pipelines is destroyed waitForPipelines(0); From fba8b1d7d34fec18746d703b3777cf321afbc33b Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Thu, 3 Apr 2025 00:57:10 +0530 Subject: [PATCH 3/5] Fixing test case failures. --- .../hadoop/hdds/scm/node/TestSCMNodeManager.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index cb2315f7fd56..92dd22a8e551 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -433,7 +433,7 @@ public void testScmLayoutOnRegister() // creation, they will fail with not enough healthy nodes for ratis 3 // pipeline. Therefore we do not have to worry about this create call // failing due to datanodes reaching their maximum pipeline limit. - assertPipelineCreationFailsWithNotEnoughNodes(1); + assertPipelineCreationFailsWithExceedingLimit(2); // Heartbeat bad MLV nodes back to healthy. nodeManager.processLayoutVersionReport(badMlvNode1, CORRECT_LAYOUT_PROTO); @@ -465,6 +465,18 @@ private void assertPipelineCreationFailsWithNotEnoughNodes( actualNodeCount); } + private void assertPipelineCreationFailsWithExceedingLimit(int limit) { + SCMException ex = assertThrows(SCMException.class, () -> { + ReplicationConfig ratisThree = + ReplicationConfig.fromProtoTypeAndFactor( + HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + scm.getPipelineManager().createPipeline(ratisThree); + }, "3 nodes should not have been found for a pipeline."); + assertThat(ex.getMessage()).contains("Ratis pipeline number meets the limit per datanode: " + + limit); + } + private void assertPipelines(HddsProtos.ReplicationFactor factor, Predicate countCheck, Collection allowedDNs) throws Exception { From 6137e2d75eaeccba80a363f960891229b1eb6147 Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Tue, 8 Apr 2025 09:58:15 +0530 Subject: [PATCH 4/5] Refactoring fn for better readability --- .../scm/pipeline/RatisPipelineProvider.java | 45 +++++++++++-------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index f5a4cda0e5ca..56631e8c9370 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -101,32 +101,41 @@ public RatisPipelineProvider(NodeManager nodeManager, } } - private boolean exceedPipelineNumberLimit( - RatisReplicationConfig replicationConfig) { + private boolean exceedPipelineNumberLimit(RatisReplicationConfig replicationConfig) { + // Apply limits only for replication factor THREE if (replicationConfig.getReplicationFactor() != ReplicationFactor.THREE) { - // Only put limits for Factor THREE pipelines. return false; } - // Per datanode limit + + PipelineStateManager pipelineStateManager = getPipelineStateManager(); + + int totalActivePipelines = pipelineStateManager.getPipelines(replicationConfig).size(); + + int closedPipelines = pipelineStateManager.getPipelines(replicationConfig, PipelineState.CLOSED).size(); + + int openPipelines = totalActivePipelines - closedPipelines; + + // Check per-datanode pipeline limit if (maxPipelinePerDatanode > 0) { - return (getPipelineStateManager().getPipelines(replicationConfig).size() - - getPipelineStateManager().getPipelines(replicationConfig, - PipelineState.CLOSED).size()) >= maxPipelinePerDatanode * - getNodeManager().getNodeCount(NodeStatus.inServiceHealthy()) / - replicationConfig.getRequiredNodes(); + int healthyNodeCount = getNodeManager() + .getNodeCount(NodeStatus.inServiceHealthy()); + + int allowedOpenPipelines = (maxPipelinePerDatanode * healthyNodeCount) + / replicationConfig.getRequiredNodes(); + + return openPipelines >= allowedOpenPipelines; } - // Global limit + // Check global pipeline limit if (pipelineNumberLimit > 0) { - return (getPipelineStateManager().getPipelines(replicationConfig).size() - - getPipelineStateManager().getPipelines( - replicationConfig, PipelineState.CLOSED).size()) >= - (pipelineNumberLimit - getPipelineStateManager() - .getPipelines(RatisReplicationConfig - .getInstance(ReplicationFactor.ONE)) - .size()); - } + int factorOnePipelineCount = pipelineStateManager + .getPipelines(RatisReplicationConfig.getInstance(ReplicationFactor.ONE)).size(); + int allowedOpenPipelines = pipelineNumberLimit - factorOnePipelineCount; + + return openPipelines >= allowedOpenPipelines; + } + // No limits are set return false; } From 12640c6f42d7d679aecca36dcb4b5c949ab20fcf Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Mon, 21 Apr 2025 17:08:57 +0530 Subject: [PATCH 5/5] addressing review comments. --- .../scm/pipeline/RatisPipelineProvider.java | 11 +---------- .../hdds/scm/node/TestSCMNodeManager.java | 19 ++++++++++--------- .../pipeline/TestRatisPipelineProvider.java | 2 +- 3 files changed, 12 insertions(+), 20 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index a4223646c3be..a7c09238b4cf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -108,31 +108,22 @@ private boolean exceedPipelineNumberLimit(RatisReplicationConfig replicationConf } PipelineStateManager pipelineStateManager = getPipelineStateManager(); - int totalActivePipelines = pipelineStateManager.getPipelines(replicationConfig).size(); - int closedPipelines = pipelineStateManager.getPipelines(replicationConfig, PipelineState.CLOSED).size(); - int openPipelines = totalActivePipelines - closedPipelines; - // Check per-datanode pipeline limit if (maxPipelinePerDatanode > 0) { int healthyNodeCount = getNodeManager() .getNodeCount(NodeStatus.inServiceHealthy()); - int allowedOpenPipelines = (maxPipelinePerDatanode * healthyNodeCount) / replicationConfig.getRequiredNodes(); - return openPipelines >= allowedOpenPipelines; } - // Check global pipeline limit if (pipelineNumberLimit > 0) { int factorOnePipelineCount = pipelineStateManager .getPipelines(RatisReplicationConfig.getInstance(ReplicationFactor.ONE)).size(); - int allowedOpenPipelines = pipelineNumberLimit - factorOnePipelineCount; - return openPipelines >= allowedOpenPipelines; } // No limits are set @@ -161,7 +152,7 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig, : String.format(": %d", pipelineNumberLimit); throw new SCMException( - String.format("Ratis pipeline number meets the limit %s replicationConfig: %s", + String.format("Cannot create pipeline as it would exceed the limit %s replicationConfig: %s", limitInfo, replicationConfig), SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE ); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index 8d7e24f2e9fc..bd10fb3da0b0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -469,15 +469,16 @@ private void assertPipelineCreationFailsWithNotEnoughNodes( } private void assertPipelineCreationFailsWithExceedingLimit(int limit) { - SCMException ex = assertThrows(SCMException.class, () -> { - ReplicationConfig ratisThree = - ReplicationConfig.fromProtoTypeAndFactor( - HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.THREE); - scm.getPipelineManager().createPipeline(ratisThree); - }, "3 nodes should not have been found for a pipeline."); - assertThat(ex.getMessage()).contains("Ratis pipeline number meets the limit per datanode: " + - limit); + // Build once, outside the assertion + ReplicationConfig config = ReplicationConfig.fromProtoTypeAndFactor( + HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + SCMException ex = assertThrows( + SCMException.class, + () -> scm.getPipelineManager().createPipeline(config), + "3 nodes should not have been found for a pipeline."); + assertThat(ex.getMessage()) + .contains("Cannot create pipeline as it would exceed the limit per datanode: " + limit); } private void assertPipelines(HddsProtos.ReplicationFactor factor, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java index a13354b70065..3b40d6c6077c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java @@ -384,7 +384,7 @@ public void testCreatePipelineThrowErrorWithDataNodeLimit(int limit, int pipelin // Validate exception message. String expectedError = String.format( - "Ratis pipeline number meets the limit per datanode: %d replicationConfig: RATIS/THREE", limit); + "Cannot create pipeline as it would exceed the limit per datanode: %d replicationConfig: RATIS/THREE", limit); assertEquals(expectedError, exception.getMessage()); }