diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 2828f6ea41ca..15a7bea84dea 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -18,23 +18,26 @@ package org.apache.hadoop.hdds.scm.pipeline; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; +import com.google.common.base.Preconditions; /** * Represents a group of datanodes which store a container. @@ -50,6 +53,8 @@ public final class Pipeline { private Map nodeStatus; // nodes with ordered distance to client private ThreadLocal> nodesInOrder = new ThreadLocal<>(); + // Current reported Leader for the pipeline + private UUID leaderId; /** * The immutable properties of pipeline object is used in @@ -102,6 +107,22 @@ public PipelineState getPipelineState() { return state; } + /** + * Return the pipeline leader's UUID. + * + * @return DatanodeDetails.UUID. + */ + public UUID getLeaderId() { + return leaderId; + } + + /** + * Pipeline object, outside of letting leader id to be set, is immutable. + */ + void setLeaderId(UUID leaderId) { + this.leaderId = leaderId; + } + /** * Returns the list of nodes which form this pipeline. * @@ -154,13 +175,13 @@ void reportDatanode(DatanodeDetails dn) throws IOException { nodeStatus.put(dn, System.currentTimeMillis()); } - boolean isHealthy() { + public boolean isHealthy() { for (Long reportedTime : nodeStatus.values()) { if (reportedTime < 0) { return false; } } - return true; + return leaderId != null; } public boolean isEmpty() { @@ -174,7 +195,7 @@ public HddsProtos.Pipeline getProtobufMessage() .setType(type) .setFactor(factor) .setState(PipelineState.getProtobuf(state)) - .setLeaderID("") + .setLeaderID(leaderId != null ? leaderId.toString() : "") .addAllMembers(nodeStatus.keySet().stream() .map(DatanodeDetails::getProtoBufMessage) .collect(Collectors.toList())); @@ -206,6 +227,8 @@ public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline) .setFactor(pipeline.getFactor()) .setType(pipeline.getType()) .setState(PipelineState.fromProtobuf(pipeline.getState())) + .setLeaderId(StringUtils.isNotEmpty(pipeline.getLeaderID()) ? + UUID.fromString(pipeline.getLeaderID()) : null) .setNodes(pipeline.getMembersList().stream() .map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList())) .setNodesInOrder(pipeline.getMemberOrdersList()) @@ -274,6 +297,7 @@ public static class Builder { private Map nodeStatus = null; private List nodeOrder = null; private List nodesInOrder = null; + private UUID leaderId = null; public Builder() {} @@ -306,6 +330,11 @@ public Builder setState(PipelineState state1) { return this; } + public Builder setLeaderId(UUID leaderId1) { + this.leaderId = leaderId1; + return this; + } + public Builder setNodes(List nodes) { this.nodeStatus = new LinkedHashMap<>(); nodes.forEach(node -> nodeStatus.put(node, -1L)); @@ -324,6 +353,7 @@ public Pipeline build() { Preconditions.checkNotNull(state); Preconditions.checkNotNull(nodeStatus); Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus); + pipeline.setLeaderId(leaderId); if (nodeOrder != null && !nodeOrder.isEmpty()) { // This branch is for build from ProtoBuf diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index d2bb355ff8a4..39a01dc1faf7 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -67,13 +67,13 @@ enum PipelineState { } message Pipeline { - required string leaderID = 1; - repeated DatanodeDetailsProto members = 2; + repeated DatanodeDetailsProto members = 1; // TODO: remove the state and leaderID from this class - optional PipelineState state = 3 [default = PIPELINE_ALLOCATED]; - optional ReplicationType type = 4 [default = STAND_ALONE]; - optional ReplicationFactor factor = 5 [default = ONE]; - required PipelineID id = 6; + optional PipelineState state = 2 [default = PIPELINE_ALLOCATED]; + optional ReplicationType type = 3 [default = STAND_ALONE]; + optional ReplicationFactor factor = 4 [default = ONE]; + required PipelineID id = 5; + optional string leaderID = 6; repeated uint32 memberOrders = 7; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java index e7f4347e9e43..eaf10120575a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java @@ -31,9 +31,9 @@ /** * Publishes Pipeline which will be sent to SCM as part of heartbeat. - * PipelineReport consist of the following information about each containers: + * PipelineReport consist of the following information about each pipeline: * - pipelineID - * + * - leaderID */ public class PipelineReportPublisher extends ReportPublisher { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 9d6a189833aa..b617169de421 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -35,6 +35,8 @@ import org.apache.hadoop.util.Time; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftGroupMemberId; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.RaftServerProxy; @@ -854,4 +856,10 @@ public void close() throws IOException { executor.shutdown(); } } + + @Override + public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, + RaftPeerId raftPeerId) { + ratisServer.handleLeaderChangedNotification(groupMemberId, raftPeerId); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index f816bcb252b0..1146394fee91 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; import com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -68,12 +69,14 @@ import java.io.IOException; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Collections; import java.util.Set; import java.util.UUID; import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -107,6 +110,9 @@ private static long nextCallId() { // TODO: Remove the gids set when Ratis supports an api to query active // pipelines private final Set raftGids = new HashSet<>(); + private final RaftPeerId raftPeerId; + // pipelines for which I am the leader + private Map groupLeaderMap = new ConcurrentHashMap<>(); private XceiverServerRatis(DatanodeDetails dd, int port, ContainerDispatcher dispatcher, ContainerController containerController, @@ -136,9 +142,10 @@ private XceiverServerRatis(DatanodeDetails dd, int port, TimeUnit.MILLISECONDS); this.dispatcher = dispatcher; this.containerController = containerController; + this.raftPeerId = RatisHelper.toRaftPeerId(dd); RaftServer.Builder builder = - RaftServer.newBuilder().setServerId(RatisHelper.toRaftPeerId(dd)) + RaftServer.newBuilder().setServerId(raftPeerId) .setProperties(serverProperties) .setStateMachineRegistry(this::getStateMachine); if (tlsConfig != null) { @@ -593,6 +600,7 @@ public List getPipelineReport() { for (RaftGroupId groupId : gids) { reports.add(PipelineReport.newBuilder() .setPipelineID(PipelineID.valueOf(groupId.getUuid()).getProtobuf()) + .setIsLeader(groupLeaderMap.getOrDefault(groupId, Boolean.FALSE)) .build()); } return reports; @@ -676,9 +684,26 @@ public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException { void notifyGroupRemove(RaftGroupId gid) { raftGids.remove(gid); + // Remove any entries for group leader map + groupLeaderMap.remove(gid); } void notifyGroupAdd(RaftGroupId gid) { raftGids.add(gid); } + + void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId, + RaftPeerId raftPeerId1) { + LOG.info("Leader change notification received for group: {} with new " + + "leaderId: {}", groupMemberId.getGroupId(), raftPeerId1); + // Save the reported leader to be sent with the report to SCM + boolean leaderForGroup = this.raftPeerId.equals(raftPeerId1); + groupLeaderMap.put(groupMemberId.getGroupId(), leaderForGroup); + if (context != null && leaderForGroup) { + // Publish new report from leader + context.addReport(context.getParent().getContainer().getPipelineReport()); + // Trigger HB immediately + context.getParent().triggerHeartbeat(); + } + } } diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index a975cd5605fc..45a1db681542 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -239,6 +239,7 @@ message ContainerAction { message PipelineReport { required PipelineID pipelineID = 1; + required bool isLeader = 2; } message PipelineReportsProto { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java index 793f4e2a5e27..b8cb7b4246c7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -18,25 +18,23 @@ package org.apache.hadoop.hdds.scm.pipeline; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.Objects; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReport; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.server - .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; +import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Objects; +import com.google.common.base.Preconditions; /** * Handles Pipeline Reports from datanode. @@ -44,8 +42,8 @@ public class PipelineReportHandler implements EventHandler { - private static final Logger LOGGER = LoggerFactory - .getLogger(PipelineReportHandler.class); + private static final Logger LOGGER = LoggerFactory.getLogger( + PipelineReportHandler.class); private final PipelineManager pipelineManager; private final Configuration conf; private final SCMSafeModeManager scmSafeModeManager; @@ -62,7 +60,6 @@ public PipelineReportHandler(SCMSafeModeManager scmSafeModeManager, this.pipelineAvailabilityCheck = conf.getBoolean( HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT); - } @Override @@ -72,8 +69,8 @@ public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode, DatanodeDetails dn = pipelineReportFromDatanode.getDatanodeDetails(); PipelineReportsProto pipelineReport = pipelineReportFromDatanode.getReport(); - Preconditions.checkNotNull(dn, "Pipeline Report is " - + "missing DatanodeDetails."); + Preconditions.checkNotNull(dn, + "Pipeline Report is missing DatanodeDetails."); if (LOGGER.isTraceEnabled()) { LOGGER.trace("Processing pipeline report for dn: {}", dn); } @@ -89,7 +86,6 @@ public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode, publisher.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, pipelineReportFromDatanode); } - } private void processPipelineReport(PipelineReport report, DatanodeDetails dn) @@ -104,16 +100,24 @@ private void processPipelineReport(PipelineReport report, DatanodeDetails dn) return; } + pipeline.reportDatanode(dn); + if (report.getIsLeader()) { + pipeline.setLeaderId(dn.getUuid()); + } + if ((pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) + && pipeline.isHealthy()) { + pipelineManager.openPipeline(pipelineID); + } + if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { - LOGGER.info("Pipeline {} reported by {}", pipeline.getId(), dn); - pipeline.reportDatanode(dn); - if (pipeline.isHealthy()) { - // if all the dns have reported, pipeline can be moved to OPEN state + + + if (report.getIsLeader()) { + // Pipeline reported as the leader + pipeline.setLeaderId(dn.getUuid()); pipelineManager.openPipeline(pipelineID); } - } else { - // In OPEN state case just report the datanode - pipeline.reportDatanode(dn); } + pipeline.reportDatanode(dn); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java index 76150579f849..2410b544581c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -52,9 +52,7 @@ class PipelineStateManager { void addPipeline(Pipeline pipeline) throws IOException { pipelineStateMap.addPipeline(pipeline); - if (pipeline.getPipelineState() == PipelineState.OPEN) { - LOG.info("Created pipeline " + pipeline); - } + LOG.info("Created pipeline " + pipeline); } void addContainerToPipeline(PipelineID pipelineId, ContainerID containerID) @@ -131,8 +129,8 @@ Pipeline openPipeline(PipelineID pipelineId) throws IOException { throw new IOException("Closed pipeline can not be opened"); } if (pipeline.getPipelineState() == PipelineState.ALLOCATED) { - pipeline = pipelineStateMap - .updatePipelineState(pipelineId, PipelineState.OPEN); + pipeline = pipelineStateMap.updatePipelineState( + pipelineId, PipelineState.OPEN); LOG.info("Pipeline {} moved to OPEN state", pipeline.toString()); } return pipeline; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 0324a58f13ab..94443dd7a193 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -153,30 +153,23 @@ public Pipeline create(ReplicationFactor factor) throws IOException { throw new InsufficientDatanodesException(e); } - Pipeline pipeline = Pipeline.newBuilder() - .setId(PipelineID.randomId()) - .setState(PipelineState.OPEN) - .setType(ReplicationType.RATIS) - .setFactor(factor) - .setNodes(dns) - .build(); + Pipeline pipeline = create(factor, dns); initializePipeline(pipeline); return pipeline; } @Override public Pipeline create(ReplicationFactor factor, - List nodes) { + List nodes) { return Pipeline.newBuilder() .setId(PipelineID.randomId()) - .setState(PipelineState.OPEN) + .setState(PipelineState.ALLOCATED) .setType(ReplicationType.RATIS) .setFactor(factor) .setNodes(nodes) .build(); } - @Override public void shutdown() { forkJoinPool.shutdownNow(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java index 7a00d760fa4d..2f9a66f27b52 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java @@ -17,10 +17,11 @@ */ package org.apache.hadoop.hdds.scm.safemode; -import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; @@ -30,16 +31,13 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; - - -import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdds.server.events.TypedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.Set; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; /** * Class defining Safe mode exit criteria for Pipelines. @@ -54,17 +52,17 @@ public class HealthyPipelineSafeModeRule public static final Logger LOG = LoggerFactory.getLogger(HealthyPipelineSafeModeRule.class); private final PipelineManager pipelineManager; - private final int healthyPipelineThresholdCount; + private int healthyPipelineThresholdCount; private int currentHealthyPipelineCount = 0; - private final Set processedDatanodeDetails = - new HashSet<>(); + private final Map processedPipelines = new HashMap<>(); + private final double healthyPipelinesPercent; HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue, PipelineManager pipelineManager, SCMSafeModeManager manager, Configuration configuration) { super(manager, ruleName, eventQueue); this.pipelineManager = pipelineManager; - double healthyPipelinesPercent = + healthyPipelinesPercent = configuration.getDouble(HddsConfigKeys. HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT, HddsConfigKeys. @@ -94,6 +92,12 @@ public class HealthyPipelineSafeModeRule healthyPipelineThresholdCount); } + @VisibleForTesting + public void setHealthyPipelineThresholdCount(int actualPipelineCount) { + healthyPipelineThresholdCount = + (int) Math.ceil(healthyPipelinesPercent * actualPipelineCount); + } + @Override protected TypedEvent getEventType() { return SCMEvents.PROCESSED_PIPELINE_REPORT; @@ -116,46 +120,41 @@ protected void process(PipelineReportFromDatanode // processed report event, we should not consider this pipeline report // from datanode again during threshold calculation. Preconditions.checkNotNull(pipelineReportFromDatanode); - DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails(); - if (!processedDatanodeDetails.contains( - pipelineReportFromDatanode.getDatanodeDetails())) { + PipelineReportsProto pipelineReport = + pipelineReportFromDatanode.getReport(); + + for (PipelineReport report : pipelineReport.getPipelineReportList()) { + PipelineID pipelineID = PipelineID.getFromProtobuf( + report.getPipelineID()); Pipeline pipeline; - PipelineReportsProto pipelineReport = - pipelineReportFromDatanode.getReport(); - - for (PipelineReport report : pipelineReport.getPipelineReportList()) { - PipelineID pipelineID = PipelineID - .getFromProtobuf(report.getPipelineID()); - try { - pipeline = pipelineManager.getPipeline(pipelineID); - } catch (PipelineNotFoundException e) { - continue; - } + try { + pipeline = pipelineManager.getPipeline(pipelineID); + } catch (PipelineNotFoundException e) { + continue; + } + if (!processedPipelines.containsKey(pipelineID)) { if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE && - pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) { - // If the pipeline is open state mean, all 3 datanodes are reported - // for this pipeline. + report.getIsLeader()) { + // If the pipeline gets reported with a leader we mark it as healthy currentHealthyPipelineCount++; getSafeModeMetrics().incCurrentHealthyPipelinesCount(); + processedPipelines.put(pipelineID, Boolean.TRUE); } } - if (scmInSafeMode()) { - SCMSafeModeManager.getLogger().info( - "SCM in safe mode. Healthy pipelines reported count is {}, " + - "required healthy pipeline reported count is {}", - currentHealthyPipelineCount, healthyPipelineThresholdCount); - } - - processedDatanodeDetails.add(dnDetails); } - + if (scmInSafeMode()) { + SCMSafeModeManager.getLogger().info( + "SCM in safe mode. Healthy pipelines reported count is {}, " + + "required healthy pipeline reported count is {}", + currentHealthyPipelineCount, healthyPipelineThresholdCount); + } } @Override protected void cleanup() { - processedDatanodeDetails.clear(); + processedPipelines.clear(); } @VisibleForTesting diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java index b9e53330691f..44d1c941774b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java @@ -102,8 +102,7 @@ public SafeModeHandler(Configuration configuration, */ @Override public void onMessage(SafeModeStatus safeModeStatus, - EventPublisher publisher) { - + EventPublisher publisher) { isInSafeMode.set(safeModeStatus.getSafeModeStatus()); scmClientProtocolServer.setSafeModeStatus(isInSafeMode.get()); scmBlockManager.setSafeModeStatus(isInSafeMode.get()); @@ -129,7 +128,7 @@ private void cleanupPipelines() { List pipelineList = scmPipelineManager.getPipelines(); pipelineList.forEach((pipeline) -> { try { - if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { + if (!pipeline.isHealthy()) { scmPipelineManager.finalizeAndDestroyPipeline(pipeline, false); } } catch (IOException ex) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index ea5d571d8501..a475f9b5e535 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -353,7 +353,9 @@ public static PipelineReportFromDatanode getPipelineReportFromDatanode( PipelineReportsProto.newBuilder(); for (PipelineID pipelineID : pipelineIDs) { reportBuilder.addPipelineReport( - PipelineReport.newBuilder().setPipelineID(pipelineID.getProtobuf())); + PipelineReport.newBuilder() + .setPipelineID(pipelineID.getProtobuf()) + .setIsLeader(false)); } return new PipelineReportFromDatanode(dn, reportBuilder.build()); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java index 01c53baf2bfd..342ee5bea7a2 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java @@ -19,9 +19,12 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.node.NodeManager; import java.io.IOException; +import java.util.List; /** * Mock Ratis Pipeline Provider for Mock Nodes. @@ -42,4 +45,16 @@ protected void initializePipeline(Pipeline pipeline) throws IOException { public void shutdown() { // Do nothing. } + + @Override + public Pipeline create(HddsProtos.ReplicationFactor factor, + List nodes) { + return Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setState(Pipeline.PipelineState.OPEN) + .setType(HddsProtos.ReplicationType.RATIS) + .setFactor(factor) + .setNodes(nodes) + .build(); + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java index 94c3039d41d4..f6d9b0e7c8f3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java @@ -252,7 +252,8 @@ private void firePipelineEvent(Pipeline pipeline, EventQueue eventQueue) { .newBuilder(); reportBuilder.addPipelineReport(PipelineReport.newBuilder() - .setPipelineID(pipeline.getId().getProtobuf())); + .setPipelineID(pipeline.getId().getProtobuf()) + .setIsLeader(Boolean.TRUE)); // Here no need to fire event from 3 nodes, as already pipeline is in // open state, but doing it. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java index ca54d0521135..7a099774e2bb 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java @@ -188,7 +188,8 @@ private void firePipelineEvent(Pipeline pipeline) { PipelineReportsProto.newBuilder(); reportBuilder.addPipelineReport(PipelineReport.newBuilder() - .setPipelineID(pipeline.getId().getProtobuf())); + .setPipelineID(pipeline.getId().getProtobuf()) + .setIsLeader(Boolean.TRUE)); if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) { eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java index 247b38afc7f5..1e608b338168 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java @@ -357,7 +357,8 @@ private void firePipelineEvent(Pipeline pipeline) throws Exception { PipelineReportsProto.newBuilder(); reportBuilder.addPipelineReport(PipelineReport.newBuilder() - .setPipelineID(pipeline.getId().getProtobuf())); + .setPipelineID(pipeline.getId().getProtobuf()) + .setIsLeader(Boolean.TRUE)); queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, new PipelineReportFromDatanode(pipeline.getNodes().get(0), reportBuilder.build())); @@ -493,7 +494,8 @@ public void testSafeModePipelineExitRule() throws Exception { PipelineReportsProto.Builder reportBuilder = PipelineReportsProto .newBuilder(); reportBuilder.addPipelineReport(PipelineReport.newBuilder() - .setPipelineID(pipeline.getId().getProtobuf())); + .setPipelineID(pipeline.getId().getProtobuf()) + .setIsLeader(Boolean.TRUE)); scmSafeModeManager = new SCMSafeModeManager( config, containers, pipelineManager, queue); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 2a486b1224ed..e65f197f3f52 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -21,16 +21,24 @@ import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.metrics2.MetricsRecordBuilder; @@ -40,13 +48,6 @@ import org.junit.Before; import org.junit.Test; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - /** * Test cases to verify PipelineManager. */ @@ -146,72 +147,6 @@ public void testRemovePipeline() throws IOException { pipelineManager.close(); } - @Test - public void testPipelineReport() throws IOException { - EventQueue eventQueue = new EventQueue(); - SCMPipelineManager pipelineManager = - new SCMPipelineManager(conf, nodeManager, eventQueue, null); - PipelineProvider mockRatisProvider = - new MockRatisPipelineProvider(nodeManager, - pipelineManager.getStateManager(), conf); - pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, - mockRatisProvider); - - SCMSafeModeManager scmSafeModeManager = - new SCMSafeModeManager(new OzoneConfiguration(), - new ArrayList<>(), pipelineManager, eventQueue); - - // create a pipeline in allocated state with no dns yet reported - Pipeline pipeline = pipelineManager - .createPipeline(HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.THREE); - Assert - .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isHealthy()); - Assert - .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen()); - - // get pipeline report from each dn in the pipeline - PipelineReportHandler pipelineReportHandler = - new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf); - for (DatanodeDetails dn: pipeline.getNodes()) { - PipelineReportFromDatanode pipelineReportFromDatanode = - TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); - // pipeline is not healthy until all dns report - Assert.assertFalse( - pipelineManager.getPipeline(pipeline.getId()).isHealthy()); - pipelineReportHandler - .onMessage(pipelineReportFromDatanode, new EventQueue()); - } - - // pipeline is healthy when all dns report - Assert - .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isHealthy()); - // pipeline should now move to open state - Assert - .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen()); - - // close the pipeline - pipelineManager.finalizeAndDestroyPipeline(pipeline, false); - - for (DatanodeDetails dn: pipeline.getNodes()) { - PipelineReportFromDatanode pipelineReportFromDatanode = - TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); - // pipeline report for destroyed pipeline should be ignored - pipelineReportHandler - .onMessage(pipelineReportFromDatanode, new EventQueue()); - } - - try { - pipelineManager.getPipeline(pipeline.getId()); - Assert.fail("Pipeline should not have been retrieved"); - } catch (IOException e) { - Assert.assertTrue(e.getMessage().contains("not found")); - } - - // clean up - pipelineManager.close(); - } - @Test public void testPipelineCreationFailedMetric() throws Exception { MockNodeManager nodeManagerMock = new MockNodeManager(true, @@ -314,4 +249,70 @@ public void testActivateDeactivatePipeline() throws IOException { pipelineManager.close(); } + + @Test + public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { + EventQueue eventQueue = new EventQueue(); + SCMPipelineManager pipelineManager = + new SCMPipelineManager(conf, nodeManager, eventQueue, null); + PipelineProvider mockRatisProvider = + new MockRatisPipelineProvider(nodeManager, + pipelineManager.getStateManager(), conf); + pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, + mockRatisProvider); + Pipeline pipeline = pipelineManager + .createPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + // close manager + pipelineManager.close(); + // new pipeline manager loads the pipelines from the db in ALLOCATED state + pipelineManager = + new SCMPipelineManager(conf, nodeManager, eventQueue, null); + mockRatisProvider = + new MockRatisPipelineProvider(nodeManager, + pipelineManager.getStateManager(), conf); + pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, + mockRatisProvider); + Assert.assertEquals(Pipeline.PipelineState.ALLOCATED, + pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); + + SCMSafeModeManager scmSafeModeManager = + new SCMSafeModeManager(new OzoneConfiguration(), + new ArrayList<>(), pipelineManager, eventQueue); + PipelineReportHandler pipelineReportHandler = + new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf); + + // Report pipelines with leaders + List nodes = pipeline.getNodes(); + Assert.assertEquals(3, nodes.size()); + // Send report for all but no leader + nodes.forEach(dn -> + sendPipelineReport(dn, pipeline, pipelineReportHandler, false)); + + Assert.assertEquals(Pipeline.PipelineState.ALLOCATED, + pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); + + nodes.subList(0, 2).forEach(dn -> + sendPipelineReport(dn, pipeline, pipelineReportHandler, false)); + sendPipelineReport(nodes.get(nodes.size() - 1), pipeline, + pipelineReportHandler, true); + + Assert.assertEquals(Pipeline.PipelineState.OPEN, + pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); + } + + private void sendPipelineReport(DatanodeDetails dn, + Pipeline pipeline, PipelineReportHandler pipelineReportHandler, + boolean isLeader) { + + PipelineReportsProto.Builder reportProtoBuilder = + PipelineReportsProto.newBuilder(); + PipelineReport.Builder reportBuilder = PipelineReport.newBuilder(); + reportBuilder.setPipelineID(pipeline.getId().getProtobuf()); + reportBuilder.setIsLeader(isLeader); + + pipelineReportHandler.onMessage(new PipelineReportFromDatanode(dn, + reportProtoBuilder.addPipelineReport( + reportBuilder.build()).build()), new EventQueue()); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 8b2fc9200a68..2813711c2f28 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -28,6 +28,9 @@ import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.safemode.HealthyPipelineSafeModeRule; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -64,6 +67,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState @@ -143,10 +147,31 @@ public void waitForClusterToBeReady() throws TimeoutException, InterruptedException { GenericTestUtils.waitFor(() -> { final int healthy = scm.getNodeCount(HEALTHY); - final boolean isReady = healthy == hddsDatanodes.size(); - LOG.info("{}. Got {} of {} DN Heartbeats.", - isReady? "Cluster is ready" : "Waiting for cluster to be ready", - healthy, hddsDatanodes.size()); + boolean isReady = healthy == hddsDatanodes.size(); + boolean printIsReadyMsg = true; + List pipelines = scm.getPipelineManager().getPipelines(); + if (!pipelines.isEmpty()) { + List raftPipelines = pipelines.stream().filter(p -> + p.getType() == HddsProtos.ReplicationType.RATIS).collect( + Collectors.toList()); + if (!raftPipelines.isEmpty()) { + List notOpenPipelines = raftPipelines.stream().filter(p -> + p.getPipelineState() != Pipeline.PipelineState.OPEN && + p.getPipelineState() != Pipeline.PipelineState.CLOSED) + .collect(Collectors.toList()); + if (notOpenPipelines.size() > 0) { + LOG.info("Waiting for {} number of pipelines out of {}, to report " + + "a leader.", notOpenPipelines.size(), raftPipelines.size()); + isReady = false; + printIsReadyMsg = false; + } + } + } + if (printIsReadyMsg) { + LOG.info("{}. Got {} of {} DN Heartbeats.", + isReady ? "Cluster is ready" : "Waiting for cluster to be ready", + healthy, hddsDatanodes.size()); + } return isReady; }, 1000, waitForClusterToBeReadyTimeout); } @@ -260,6 +285,19 @@ public void restartOzoneManager() throws IOException { ozoneManager.restart(); } + private void waitForHddsDatanodesStop() throws TimeoutException, + InterruptedException { + GenericTestUtils.waitFor(() -> { + final int healthy = scm.getNodeCount(HEALTHY); + boolean isReady = healthy == hddsDatanodes.size(); + if (!isReady) { + LOG.info("Waiting on {} datanodes out of {} to be marked unhealthy.", + healthy, hddsDatanodes.size()); + } + return isReady; + }, 1000, waitForClusterToBeReadyTimeout); + } + @Override public void restartHddsDatanode(int i, boolean waitForDatanode) throws InterruptedException, TimeoutException { @@ -279,7 +317,7 @@ public void restartHddsDatanode(int i, boolean waitForDatanode) hddsDatanodes.remove(i); if (waitForDatanode) { // wait for node to be removed from SCM healthy node list. - waitForClusterToBeReady(); + waitForHddsDatanodesStop(); } String[] args = new String[]{}; HddsDatanodeService service = @@ -512,7 +550,15 @@ StorageContainerManager createSCM() configureSCM(); SCMStorageConfig scmStore = new SCMStorageConfig(conf); initializeScmStorage(scmStore); - return StorageContainerManager.createSCM(conf); + StorageContainerManager scm = StorageContainerManager.createSCM(conf); + HealthyPipelineSafeModeRule rule = + scm.getScmSafeModeManager().getHealthyPipelineSafeModeRule(); + if (rule != null) { + // Set threshold to wait for safe mode exit - this is needed since a + // pipeline is marked open only after leader election. + rule.setHealthyPipelineThresholdCount(numOfDatanodes / 3); + } + return scm; } private void initializeScmStorage(SCMStorageConfig scmStore) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java index 2f1ec66d6948..dcb45ad03f66 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.scm.pipeline; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; @@ -50,6 +51,8 @@ public class TestSCMPipelineMetrics { @Before public void setup() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, + Boolean.TRUE.toString()); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3) .build();