diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 522a7477c8dd..51598a77ce88 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -313,6 +313,7 @@ public Builder(Pipeline pipeline) { this.state = pipeline.state; this.nodeStatus = pipeline.nodeStatus; this.nodesInOrder = pipeline.nodesInOrder.get(); + this.leaderId = pipeline.getLeaderId(); } public Builder setId(PipelineID id1) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index 89c4eb3d64f1..baeb6dcf318a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -362,6 +362,15 @@ public static PipelineReportFromDatanode getPipelineReportFromDatanode( return new PipelineReportFromDatanode(dn, reportBuilder.build()); } + public static PipelineReportFromDatanode getPipelineReportFromDatanode( + DatanodeDetails dn, PipelineID pipelineID, boolean isLeader) { + PipelineReportsProto.Builder reportBuilder = + PipelineReportsProto.newBuilder(); + reportBuilder.addPipelineReport(PipelineReport.newBuilder() + .setPipelineID(pipelineID.getProtobuf()).setIsLeader(isLeader)); + return new PipelineReportFromDatanode(dn, reportBuilder.build()); + } + public static void openAllRatisPipelines(PipelineManager pipelineManager) throws IOException { // Pipeline is created by background thread diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 69227021a694..9e2cfa9488ab 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -34,12 +34,11 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .PipelineReportFromDatanode; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.metrics2.MetricsRecordBuilder; @@ -174,18 +173,17 @@ public void testPipelineReport() throws IOException { Assert .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isOpen()); - // get pipeline report from each dn in the pipeline + // pipeline is not healthy until all dns report PipelineReportHandler pipelineReportHandler = new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf); - for (DatanodeDetails dn: pipeline.getNodes()) { - PipelineReportFromDatanode pipelineReportFromDatanode = - TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); - // pipeline is not healthy until all dns report - Assert.assertFalse( - pipelineManager.getPipeline(pipeline.getId()).isHealthy()); - pipelineReportHandler - .onMessage(pipelineReportFromDatanode, new EventQueue()); - } + List nodes = pipeline.getNodes(); + Assert.assertFalse( + pipelineManager.getPipeline(pipeline.getId()).isHealthy()); + // get pipeline report from each dn in the pipeline + nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline, + pipelineReportHandler, false, eventQueue)); + sendPipelineReport(nodes.get(nodes.size() - 1), pipeline, + pipelineReportHandler, true, eventQueue); // pipeline is healthy when all dns report Assert @@ -197,13 +195,11 @@ public void testPipelineReport() throws IOException { // close the pipeline pipelineManager.finalizeAndDestroyPipeline(pipeline, false); - for (DatanodeDetails dn: pipeline.getNodes()) { - PipelineReportFromDatanode pipelineReportFromDatanode = - TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); - // pipeline report for destroyed pipeline should be ignored - pipelineReportHandler - .onMessage(pipelineReportFromDatanode, new EventQueue()); - } + // pipeline report for destroyed pipeline should be ignored + nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline, + pipelineReportHandler, false, eventQueue)); + sendPipelineReport(nodes.get(nodes.size() - 1), pipeline, + pipelineReportHandler, true, eventQueue); try { pipelineManager.getPipeline(pipeline.getId()); @@ -271,7 +267,7 @@ public void testPipelineCreationFailedMetric() throws Exception { numPipelineCreateFailed = getLongCounter( "NumPipelineCreationFailed", metrics); Assert.assertTrue(numPipelineCreateFailed == 0); - + // clean up pipelineManager.close(); } @@ -358,16 +354,16 @@ public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { List nodes = pipeline.getNodes(); Assert.assertEquals(3, nodes.size()); // Send report for all but no leader - nodes.forEach(dn -> - sendPipelineReport(dn, pipeline, pipelineReportHandler, false)); + nodes.forEach(dn -> sendPipelineReport(dn, pipeline, pipelineReportHandler, + false, eventQueue)); Assert.assertEquals(Pipeline.PipelineState.ALLOCATED, pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); - nodes.subList(0, 2).forEach(dn -> - sendPipelineReport(dn, pipeline, pipelineReportHandler, false)); + nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline, + pipelineReportHandler, false, eventQueue)); sendPipelineReport(nodes.get(nodes.size() - 1), pipeline, - pipelineReportHandler, true); + pipelineReportHandler, true, eventQueue); Assert.assertEquals(Pipeline.PipelineState.OPEN, pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); @@ -377,16 +373,9 @@ public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { private void sendPipelineReport(DatanodeDetails dn, Pipeline pipeline, PipelineReportHandler pipelineReportHandler, - boolean isLeader) { - - PipelineReportsProto.Builder reportProtoBuilder = - PipelineReportsProto.newBuilder(); - PipelineReport.Builder reportBuilder = PipelineReport.newBuilder(); - reportBuilder.setPipelineID(pipeline.getId().getProtobuf()); - reportBuilder.setIsLeader(isLeader); - - pipelineReportHandler.onMessage(new PipelineReportFromDatanode(dn, - reportProtoBuilder.addPipelineReport( - reportBuilder.build()).build()), new EventQueue()); + boolean isLeader, EventQueue eventQueue) { + PipelineReportFromDatanode report = + TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId(), isLeader); + pipelineReportHandler.onMessage(report, eventQueue); } }