diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index ccf7b308d41c..51d376fa1581 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.IOException; @@ -49,6 +50,7 @@ import org.apache.hadoop.hdds.utils.db.DelegatedCodec; import org.apache.hadoop.hdds.utils.db.Proto2Codec; import org.apache.hadoop.ozone.ClientVersion; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -337,6 +339,22 @@ void reportDatanode(DatanodeDetails dn) throws IOException { nodeStatus.put(dn, System.currentTimeMillis()); } + @VisibleForTesting + public DatanodeDetails removeFirstFromNodeStatus() { + Iterator iterator = nodeStatus.keySet().iterator(); + DatanodeDetails firstKey = null; + if (iterator.hasNext()) { + firstKey = iterator.next(); + nodeStatus.remove(firstKey); + } + return firstKey; + } + + @VisibleForTesting + public void setInNodeStatus(DatanodeDetails dd) { + nodeStatus.put(dd, Time.now()); + } + public boolean isHealthy() { // EC pipelines are not reported by the DN and do not have a leader. If a // node goes stale or dead, EC pipelines will be closed like RATIS pipelines diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java index fb95215bff21..253375934b8b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -51,7 +52,7 @@ public class OneReplicaPipelineSafeModeRule extends private static final String NAME = "AtleastOneDatanodeReportedRule"; private int thresholdCount; - private final Set reportedPipelineIDSet = new HashSet<>(); + private Set reportedPipelineIDSet = new HashSet<>(); private Set oldPipelineIDSet; private int currentReportedPipelineCount = 0; private PipelineManager pipelineManager; @@ -85,11 +86,19 @@ protected TypedEvent getEventType() { @Override protected synchronized boolean validate() { + if (validateBasedOnReportProcessing()) { + return currentReportedPipelineCount >= thresholdCount; + } + + updateReportedPipelineSet(); return currentReportedPipelineCount >= thresholdCount; } @Override protected synchronized void process(PipelineReportFromDatanode report) { + if (!validateBasedOnReportProcessing()) { + return; + } Preconditions.checkNotNull(report); for (PipelineReport report1 : report.getReport().getPipelineReportList()) { Pipeline pipeline; @@ -137,6 +146,11 @@ public synchronized int getCurrentReportedPipelineCount() { return currentReportedPipelineCount; } + @VisibleForTesting + public Set getReportedPipelineIDSet() { + return reportedPipelineIDSet; + } + @Override public String getStatusText() { String status = String.format( @@ -171,6 +185,25 @@ public synchronized void refresh(boolean forceRefresh) { } } + private void updateReportedPipelineSet() { + List openRatisPipelines = + pipelineManager.getPipelines(RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + Pipeline.PipelineState.OPEN); + + for (Pipeline pipeline : openRatisPipelines) { + PipelineID pipelineID = pipeline.getId(); + boolean allDNsPipelineReported = pipeline.getNodeSet().size() == 3; + boolean notAlreadyReported = !reportedPipelineIDSet.contains(pipelineID); + boolean wasExistingPipeline = oldPipelineIDSet.contains(pipelineID); + + if (allDNsPipelineReported && notAlreadyReported && wasExistingPipeline) { + getSafeModeMetrics().incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount(); + currentReportedPipelineCount++; + reportedPipelineIDSet.add(pipelineID); + } + } + } + private void initializeRule(boolean refresh) { oldPipelineIDSet = pipelineManager.getPipelines( diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java index 8d5e005ae02e..011caa725940 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.safemode; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -193,6 +194,40 @@ public void testOneReplicaPipelineRuleMixedPipelines() throws Exception { GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000); } + @Test + public void testOneReplicaPipelineRuleWithReportProcessingFalse() throws Exception { + // As with 30 nodes, We can create 7 pipelines with replication factor 3. + // (This is because in node manager for every 10 nodes, 7 nodes are + // healthy, 2 are stale one is dead.) + int totalNodes = 30; + int ratisPipelineCount = 7; + int standalonePipelineCount = 0; + + setup(totalNodes, ratisPipelineCount, standalonePipelineCount); + + // Disable validation based on report processing. + rule.setValidateBasedOnReportProcessing(false); + + List pipelines = pipelineManager.getPipelines(); + assertFalse(pipelines.isEmpty()); + + // Pick the first pipeline and remove one node to make nodeSet.size() != 3 + Pipeline targetPipeline = pipelines.get(0); + DatanodeDetails removedNode = targetPipeline.removeFirstFromNodeStatus(); + + // Validation should now fail because not all pipelines must meet the 3-node condition + assertFalse(rule.validate()); + + // Re-add the node back so pipeline meets 3-node condition + targetPipeline.setInNodeStatus(removedNode); + + // Now the rule should validate successfully + assertTrue(rule.validate()); + + // Assert that the pipeline got added back to the reported set + assertTrue(rule.getReportedPipelineIDSet().contains(targetPipeline.getId())); + } + private void createPipelines(int count, HddsProtos.ReplicationFactor factor) throws Exception { for (int i = 0; i < count; i++) {