Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ public Builder(Pipeline pipeline) {
this.state = pipeline.state;
this.nodeStatus = pipeline.nodeStatus;
this.nodesInOrder = pipeline.nodesInOrder.get();
this.leaderId = pipeline.getLeaderId();
}

public Builder setId(PipelineID id1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,15 @@ public static PipelineReportFromDatanode getPipelineReportFromDatanode(
return new PipelineReportFromDatanode(dn, reportBuilder.build());
}

public static PipelineReportFromDatanode getPipelineReportFromDatanode(
DatanodeDetails dn, PipelineID pipelineID, boolean isLeader) {
PipelineReportsProto.Builder reportBuilder =
PipelineReportsProto.newBuilder();
reportBuilder.addPipelineReport(PipelineReport.newBuilder()
.setPipelineID(pipelineID.getProtobuf()).setIsLeader(isLeader));
return new PipelineReportFromDatanode(dn, reportBuilder.build());
}

public static void openAllRatisPipelines(PipelineManager pipelineManager)
throws IOException {
// Pipeline is created by background thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,19 @@ public void testPipelineReport() throws IOException {
// get pipeline report from each dn in the pipeline
PipelineReportHandler pipelineReportHandler =
new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
Boolean isLeader = true;
for (DatanodeDetails dn: pipeline.getNodes()) {
PipelineReportFromDatanode pipelineReportFromDatanode =
TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId());
TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId(),
isLeader);
if (isLeader) {
isLeader = false;
}
// pipeline is not healthy until all dns report
Assert.assertFalse(
pipelineManager.getPipeline(pipeline.getId()).isHealthy());
pipelineReportHandler
.onMessage(pipelineReportFromDatanode, new EventQueue());
.onMessage(pipelineReportFromDatanode, eventQueue);
}

// pipeline is healthy when all dns report
Expand All @@ -197,12 +202,17 @@ public void testPipelineReport() throws IOException {
// close the pipeline
pipelineManager.finalizeAndDestroyPipeline(pipeline, false);

isLeader = true;
for (DatanodeDetails dn: pipeline.getNodes()) {
PipelineReportFromDatanode pipelineReportFromDatanode =
TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId());
TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId(),
isLeader);
if (isLeader) {
isLeader = false;
}
// pipeline report for destroyed pipeline should be ignored
pipelineReportHandler
.onMessage(pipelineReportFromDatanode, new EventQueue());
.onMessage(pipelineReportFromDatanode, eventQueue);
}

try {
Expand Down