diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java index ae0d3ea13b0a..4f25f43c829e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java @@ -22,12 +22,14 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,18 +44,23 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; - /** * Class that manages Containers created on the datanode. */ public class ContainerSet { private static final Logger LOG = LoggerFactory.getLogger(ContainerSet.class); + private final OzoneContainer ozoneContainer; private final ConcurrentSkipListMap> containerMap = new ConcurrentSkipListMap<>(); private final ConcurrentSkipListSet missingContainerSet = new ConcurrentSkipListSet<>(); + + public ContainerSet(OzoneContainer container) { + this.ozoneContainer = container; + } + /** * Add Container to container map. * @param container container to be added @@ -226,8 +233,17 @@ public ContainerReportsProto getContainerReport() throws IOException { ContainerReportsProto.Builder crBuilder = ContainerReportsProto.newBuilder(); - for (Container container: containers) { - crBuilder.addReports(container.getContainerReport()); + for (Container container : containers) { + // To handle tests where we set this to null + // in real world there is not reason for this to be + // null at all. + if (ozoneContainer != null) { + crBuilder.addReports(container.getContainerReport( + ozoneContainer.getGlobalNodeState())); + } else { + crBuilder.addReports(container.getContainerReport( + HddsProtos.NodeOperationalState.IN_SERVICE)); + } } return crBuilder.build(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java index 9d90aceaf5ae..8c0ed6c9ec38 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.container.common.helpers @@ -149,8 +150,10 @@ void exportContainerData(OutputStream stream, /** * Returns containerReport for the container. + * @param nodeState */ - ContainerReplicaProto getContainerReport() + ContainerReplicaProto getContainerReport( + HddsProtos.NodeOperationalState nodeState) throws StorageContainerException; /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 8c3b981a0938..a9c819bc14de 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -31,6 +31,7 @@ .ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -99,8 +100,8 @@ protected void sendICR(final Container container) throws StorageContainerException { IncrementalContainerReportProto icr = IncrementalContainerReportProto .newBuilder() - .addReport(container.getContainerReport()) - .build(); + .addReport(container.getContainerReport( + HddsProtos.NodeOperationalState.IN_SERVICE)).build(); context.addReport(icr); context.getParent().triggerHeartbeat(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index c9eb7024eaf1..53c73a756b81 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -46,6 +46,7 @@ .DeleteContainerCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .ReplicateContainerCommandHandler; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeStateCommandHandler; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.container.replication.ContainerReplicator; @@ -126,6 +127,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, conf)) .addHandler(new ReplicateContainerCommandHandler(conf, supervisor)) .addHandler(new DeleteContainerCommandHandler()) + .addHandler(new SetNodeStateCommandHandler(container)) .setConnectionManager(connectionManager) .setContainer(container) .setContext(context) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeStateCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeStateCommandHandler.java new file mode 100644 index 000000000000..0220251ec821 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeStateCommandHandler.java @@ -0,0 +1,106 @@ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.commands.SetNodeState; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.function.Consumer; + +/** + * Handles the set Node state command on the data node side. + */ +public class SetNodeStateCommandHandler implements CommandHandler { + private static final Logger LOG = + LoggerFactory.getLogger(SetNodeStateCommandHandler.class); + private final OzoneContainer ozoneContainer; + + /** + * Set Node State command handler. + * + * @param ozoneContainer - Ozone Container. + */ + public SetNodeStateCommandHandler(OzoneContainer ozoneContainer) { + this.ozoneContainer = ozoneContainer; + } + + /** + * Handles a given SCM command. + * + * @param command - SCM Command + * @param container - Ozone Container. + * @param context - Current Context. + * @param connectionManager - The SCMs that we are talking to. + */ + @Override + public void handle(SCMCommand command, OzoneContainer container, + StateContext context, SCMConnectionManager connectionManager) { + long startTime = Time.monotonicNow(); + StorageContainerDatanodeProtocolProtos.SetNodeStateCommandProto + setNodeCmdProto = null; + + if (command.getType() != Type.setNodeStateCommand) { + LOG.warn("Skipping handling command, expected command " + + "type {} but found {}", + Type.setNodeStateCommand, command.getType()); + return; + } + SetNodeState setNodeCmd = (SetNodeState) command; + setNodeCmdProto = setNodeCmd.getProto(); + container.setGlobalNodeState(setNodeCmdProto.getNodeState()); + } + + /** + * Returns the command type that this command handler handles. + * + * @return Type + */ + @Override + public StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type + getCommandType() { + return null; + } + + /** + * Returns number of times this handler has been invoked. + * + * @return int + */ + @Override + public int getInvocationCount() { + return 0; + } + + /** + * Returns the average time this function takes to run. + * + * @return long + */ + @Override + public long getAverageRunTime() { + return 0; + } + + /** + * Default implementation for updating command status. + * + * @param context + * @param command + * @param cmdStatusUpdater + * @param log + */ + @Override + public void updateCommandStatus( + StateContext context, SCMCommand command, + Consumer cmdStatusUpdater, Logger log) { + + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 6317e6356fe3..7b0c53879a3b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -36,6 +36,7 @@ .ContainerDataProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.container.common.helpers @@ -60,6 +61,7 @@ import com.google.common.base.Preconditions; import org.apache.commons.io.FileUtils; + import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.CONTAINER_ALREADY_EXISTS; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -108,6 +110,11 @@ public KeyValueContainer(KeyValueContainerData containerData, Configuration this.containerData = containerData; } + static File getContainerFile(String metadataPath, long containerId) { + return new File(metadataPath, + containerId + OzoneConsts.CONTAINER_EXTENSION); + } + @Override public void create(VolumeSet volumeSet, VolumeChoosingPolicy volumeChoosingPolicy, String scmId) throws StorageContainerException { @@ -213,8 +220,8 @@ public void populatePathFields(String scmId, * Writes to .container file. * * @param containerFile container file name - * @param isCreate True if creating a new file. False is updating an - * existing container file. + * @param isCreate True if creating a new file. False is updating an existing + * container file. * @throws StorageContainerException */ private void writeToContainerFile(File containerFile, boolean isCreate) @@ -260,7 +267,6 @@ private void updateContainerFile(File containerFile) writeToContainerFile(containerFile, false); } - @Override public void delete() throws StorageContainerException { long containerId = containerData.getContainerID(); @@ -352,7 +358,6 @@ public void updateDataScanTimestamp(Instant time) } /** - * * Must be invoked with the writeLock held. * * @param update @@ -382,7 +387,7 @@ private void updateContainerData(Runnable update) private void compactDB() throws StorageContainerException { try { - try(ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) { + try (ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) { db.getStore().compactDB(); } } catch (StorageContainerException ex) { @@ -410,7 +415,7 @@ private void flushAndSyncDB() throws StorageContainerException { } @Override - public KeyValueContainerData getContainerData() { + public KeyValueContainerData getContainerData() { return containerData; } @@ -433,7 +438,7 @@ public void update( // holding lock and writing data to disk. We can have async implementation // to flush the update container data to disk. long containerId = containerData.getContainerID(); - if(!containerData.isValid()) { + if (!containerData.isValid()) { LOG.debug("Invalid container data. ContainerID: {}", containerId); throw new StorageContainerException("Invalid container data. " + "ContainerID: " + containerId, INVALID_CONTAINER_STATE); @@ -454,7 +459,7 @@ public void update( File containerFile = getContainerFile(); // update the new container data to .container File updateContainerFile(containerFile); - } catch (StorageContainerException ex) { + } catch (StorageContainerException ex) { containerData.setMetadata(oldMetadata); throw ex; } finally { @@ -468,7 +473,7 @@ public void updateDeleteTransactionId(long deleteTransactionId) { } @Override - public KeyValueBlockIterator blockIterator() throws IOException{ + public KeyValueBlockIterator blockIterator() throws IOException { return new KeyValueBlockIterator(containerData.getContainerID(), new File( containerData.getContainerPath())); } @@ -597,6 +602,7 @@ public boolean hasWriteLock() { /** * Acquire read lock, unless interrupted while waiting. + * * @throws InterruptedException */ @Override @@ -606,6 +612,7 @@ public void readLockInterruptibly() throws InterruptedException { /** * Acquire write lock, unless interrupted while waiting. + * * @throws InterruptedException */ @Override @@ -616,17 +623,13 @@ public void writeLockInterruptibly() throws InterruptedException { /** * Returns containerFile. + * * @return .container File name */ @Override public File getContainerFile() { return getContainerFile(containerData.getMetadataPath(), - containerData.getContainerID()); - } - - static File getContainerFile(String metadataPath, long containerId) { - return new File(metadataPath, - containerId + OzoneConsts.CONTAINER_EXTENSION); + containerData.getContainerID()); } @Override @@ -639,12 +642,14 @@ public long getBlockCommitSequenceId() { return containerData.getBlockCommitSequenceId(); } - /** * Returns KeyValueContainerReport for the KeyValueContainer. + * + * @param nodeState */ @Override - public ContainerReplicaProto getContainerReport() + public ContainerReplicaProto getContainerReport( + HddsProtos.NodeOperationalState nodeState) throws StorageContainerException { ContainerReplicaProto.Builder ciBuilder = ContainerReplicaProto.newBuilder(); @@ -655,7 +660,7 @@ public ContainerReplicaProto getContainerReport() .setWriteBytes(containerData.getWriteBytes()) .setKeyCount(containerData.getKeyCount()) .setUsed(containerData.getBytesUsed()) - .setState(getHddsState()) + .setState(getHddsState(nodeState)) .setDeleteTransactionId(containerData.getDeleteTransactionId()) .setBlockCommitSequenceId(containerData.getBlockCommitSequenceId()) .setOriginNodeId(containerData.getOriginNodeId()); @@ -664,37 +669,60 @@ public ContainerReplicaProto getContainerReport() /** * Returns LifeCycle State of the container. + * * @return LifeCycle State of the container in HddsProtos format * @throws StorageContainerException */ - private ContainerReplicaProto.State getHddsState() + private ContainerReplicaProto.State getHddsState( + HddsProtos.NodeOperationalState nodeState) throws StorageContainerException { - ContainerReplicaProto.State state; - switch (containerData.getState()) { - case OPEN: - state = ContainerReplicaProto.State.OPEN; - break; - case CLOSING: - state = ContainerReplicaProto.State.CLOSING; - break; - case QUASI_CLOSED: - state = ContainerReplicaProto.State.QUASI_CLOSED; - break; - case CLOSED: - state = ContainerReplicaProto.State.CLOSED; - break; - case UNHEALTHY: - state = ContainerReplicaProto.State.UNHEALTHY; - break; - default: - throw new StorageContainerException("Invalid Container state found: " + - containerData.getContainerID(), INVALID_CONTAINER_STATE); + ContainerReplicaProto.State state = ContainerReplicaProto.State.INVALID; + if (nodeState == HddsProtos.NodeOperationalState.IN_SERVICE) { + switch (containerData.getState()) { + case OPEN: + state = ContainerReplicaProto.State.OPEN; + break; + case CLOSING: + state = ContainerReplicaProto.State.CLOSING; + break; + case QUASI_CLOSED: + state = ContainerReplicaProto.State.QUASI_CLOSED; + break; + case CLOSED: + state = ContainerReplicaProto.State.CLOSED; + break; + case UNHEALTHY: + state = ContainerReplicaProto.State.UNHEALTHY; + break; + default: + throw new StorageContainerException("Invalid Container state found: " + + containerData.getContainerID(), INVALID_CONTAINER_STATE); + } + } + + // If this node has been marked as Decommissioned or is under going + // Decommission, we report all containers as DECOM_PLEASE_IGNORE state. + // This allows SCM to know that these containers need to be managed in + // a different way. + if (nodeState == HddsProtos.NodeOperationalState.DECOMMISSIONED || + nodeState == HddsProtos.NodeOperationalState.DECOMMISSIONING) { + state = ContainerReplicaProto.State.DECOM_PLEASE_IGNORE; + } + + // If we are in maintenance mode, this is a request to SCM/Replica + // manager to not start replication immediately; since we are making a + // promise to the SCM that these containers will soon be operational. + + if (nodeState == HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE || + nodeState == HddsProtos.NodeOperationalState.IN_MAINTENANCE) { + state = ContainerReplicaProto.State.MAINT_CONSIDER_AVAILABLE; } return state; } /** * Returns container DB file. + * * @return */ public File getContainerDBFile() { @@ -731,19 +759,20 @@ public boolean scanData(DataTransferThrottler throttler, Canceler canceler) { return checker.fullCheck(throttler, canceler); } - private enum ContainerCheckLevel { - NO_CHECK, FAST_CHECK, FULL_CHECK - } - /** * Creates a temporary file. + * * @param file * @return * @throws IOException */ - private File createTempFile(File file) throws IOException{ + private File createTempFile(File file) throws IOException { return File.createTempFile("tmp_" + System.currentTimeMillis() + "_", file.getName(), file.getParentFile()); } + private enum ContainerCheckLevel { + NO_CHECK, FAST_CHECK, FULL_CHECK + } + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 6bce22cc7d21..912ef55b99b4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -24,10 +24,11 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto .ContainerProtos.ContainerType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.security.token.BlockTokenVerifier; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; @@ -77,12 +78,19 @@ public class OzoneContainer { private final XceiverServerSpi writeChannel; private final XceiverServerSpi readChannel; private final ContainerController controller; + private final BlockDeletingService blockDeletingService; private ContainerMetadataScanner metadataScanner; private List dataScanners; - private final BlockDeletingService blockDeletingService; + // TODO: This datanode's current state. We will read this from datanode.yaml + // on reboot, and this variable is set up via SetNodeStateCommand from + // SCM. If there is no value in datanode.yaml, we will assume that + // this is an in-service node. Right now hard coded to in_service. + private HddsProtos.NodeOperationalState globalNodeState = + HddsProtos.NodeOperationalState.IN_SERVICE; /** * Construct OzoneContainer object. + * * @param datanodeDetails * @param conf * @param certClient @@ -92,9 +100,11 @@ public class OzoneContainer { public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration conf, StateContext context, CertificateClient certClient) throws IOException { + + this.config = conf; this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf); - this.containerSet = new ContainerSet(); + this.containerSet = new ContainerSet(this); this.metadataScanner = null; buildContainerSet(); @@ -108,7 +118,7 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration SecurityConfig secConf = new SecurityConfig(conf); this.hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet, - handlers, context, metrics, secConf.isBlockTokenEnabled()? + handlers, context, metrics, secConf.isBlockTokenEnabled() ? new BlockTokenVerifier(secConf, certClient) : null); /* @@ -136,6 +146,24 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration TimeUnit.MILLISECONDS, config); } + /** + * Get this datanode's current operational Status. + * + * @return - IN_SERVICE, DECOMM, Maint etc. + */ + public HddsProtos.NodeOperationalState getGlobalNodeState() { + return globalNodeState; + } + + /** + * Set this nodes Operational Status. + * + * @param globalNodeState - by default IN_SERVICE. + */ + public void setGlobalNodeState(HddsProtos.NodeOperationalState globalNodeState) { + this.globalNodeState = globalNodeState; + } + private GrpcReplicationService createReplicationService() { return new GrpcReplicationService( new OnDemandContainerReplicationSource(controller)); @@ -169,7 +197,6 @@ private void buildContainerSet() { } - /** * Start background daemon thread for performing container integrity checks. */ @@ -240,13 +267,14 @@ public void stop() { ContainerMetrics.remove(); } - @VisibleForTesting public ContainerSet getContainerSet() { return containerSet; } + /** * Returns container report. + * * @return - container report. */ diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SetNodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SetNodeState.java new file mode 100644 index 000000000000..f4d6afdbf24c --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SetNodeState.java @@ -0,0 +1,55 @@ +package org.apache.hadoop.ozone.protocol.commands; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SetNodeStateCommandProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; + +/** + * This allows SCM to send a command to Datanode asking to set its Node State. + *

+ * A Node State can be the following : IN_SERVICE = 1; DECOMMISSIONING = 2; + * DECOMMISSIONED = 3; MAINTENANCE = 4; Once the node state is change is handled + * by the data node; the data node will report its state back to the SCM that + * allows the changes to be propogated back into SCM via the standard SCM + * architecture. + *

+ * SCM today listens to the data node reports to learn the state of nodes and + * containers. + */ +public class SetNodeState extends SCMCommand { + private final HddsProtos.NodeOperationalState state; + + /** + * Ctor that creates a set node state command. + * + * @param id - Command ID. Something a time stamp would suffice. + * @param state - NodeState that want the node to be set into. + */ + public SetNodeState(long id, HddsProtos.NodeOperationalState state) { + super(id); + this.state = state; + } + + /** + * Returns the type of this command. + * + * @return Type - This is setNodeStateCommand. + */ + @Override + public SCMCommandProto.Type getType() { + return SCMCommandProto.Type.setNodeStateCommand; + } + + /** + * Gets the protobuf message of this object. + * + * @return A protobuf message. + */ + @Override + public SetNodeStateCommandProto getProto() { + return SetNodeStateCommandProto.newBuilder() + .setNodeState(state).build(); + } +} diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index a975cd5605fc..ed16c01140f4 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -177,6 +177,7 @@ message IncrementalContainerReportProto { repeated ContainerReplicaProto report = 1; } + message ContainerReplicaProto { enum State { OPEN = 1; @@ -185,6 +186,14 @@ message ContainerReplicaProto { CLOSED = 4; UNHEALTHY = 5; INVALID = 6; + // This container is marked as Decomm and should be + // ignored by the SCM and Replica manager, in other words, this is a + // UNHEALTHY container + DECOM_PLEASE_IGNORE = 7; + + // This container should be treated as Available unless the total count + // of HEALTHY replicas goes below 1. + MAINT_CONSIDER_AVAILABLE = 8; } required int64 containerID = 1; required State state = 2; @@ -282,6 +291,7 @@ message SCMCommandProto { closeContainerCommand = 3; deleteContainerCommand = 4; replicateContainerCommand = 5; + setNodeStateCommand = 6; } // TODO: once we start using protoc 3.x, refactor this message using "oneof" required Type commandType = 1; @@ -290,6 +300,7 @@ message SCMCommandProto { optional CloseContainerCommandProto closeContainerCommandProto = 4; optional DeleteContainerCommandProto deleteContainerCommandProto = 5; optional ReplicateContainerCommandProto replicateContainerCommandProto = 6; + optional SetNodeStateCommandProto setNodeStateCommandProto = 7; } /** @@ -358,6 +369,12 @@ message ReplicateContainerCommandProto { required int64 cmdId = 3; } + + +message SetNodeStateCommandProto { + required NodeOperationalState nodeState = 1; +} + /** * Protocol used from a datanode to StorageContainerManager. * diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java index a92f23613820..a025554062ac 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java @@ -79,7 +79,7 @@ public void before() throws Exception { pipelineID.getId().toString(), null); container = new KeyValueContainer(data, new OzoneConfiguration()); - ContainerSet containerSet = new ContainerSet(); + ContainerSet containerSet = new ContainerSet(null); containerSet.addContainer(container); containerHandler = mock(Handler.class); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java index f63de207d452..5594332d19f8 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.hdds.utils.MetadataStoreBuilder; @@ -338,14 +340,17 @@ public void testCloseContainer() throws Exception { @Test public void testReportOfUnhealthyContainer() throws Exception { keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId); - Assert.assertNotNull(keyValueContainer.getContainerReport()); + Assert.assertNotNull( + keyValueContainer.getContainerReport( + HddsProtos.NodeOperationalState.IN_SERVICE)); keyValueContainer.markContainerUnhealthy(); File containerFile = keyValueContainer.getContainerFile(); keyValueContainerData = (KeyValueContainerData) ContainerDataYaml .readContainerFile(containerFile); assertEquals(ContainerProtos.ContainerDataProto.State.UNHEALTHY, keyValueContainerData.getState()); - Assert.assertNotNull(keyValueContainer.getContainerReport()); + Assert.assertNotNull(keyValueContainer.getContainerReport( + HddsProtos.NodeOperationalState.IN_SERVICE.IN_SERVICE)); } @Test diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 2c71fef11a64..a68745d43801 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -233,7 +233,7 @@ public void testVolumeSetInKeyValueHandler() throws Exception{ conf.set(HDDS_DATANODE_DIR_KEY, path.getAbsolutePath()); VolumeSet volumeSet = new VolumeSet(UUID.randomUUID().toString(), conf); try { - ContainerSet cset = new ContainerSet(); + ContainerSet cset = new ContainerSet(null); int[] interval = new int[1]; interval[0] = 2; ContainerMetrics metrics = new ContainerMetrics(interval); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index c3d3b17aefa5..8cb835bdd386 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -45,7 +45,7 @@ public class TestReplicationSupervisor { @Test public void normal() throws Exception { //GIVEN - ContainerSet set = new ContainerSet(); + ContainerSet set = new ContainerSet(null); FakeReplicator replicator = new FakeReplicator(set); ReplicationSupervisor supervisor = diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java index fbc5981eeea7..7a464255478a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java @@ -20,12 +20,15 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.ozone.protocol.commands.SetNodeState; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -239,6 +242,12 @@ public synchronized void startDecommission(DatanodeDetails dn) nodeManager.setNodeOperationalState( dn, NodeOperationalState.DECOMMISSIONING); monitor.startMonitoring(dn, 0); + + // Create a DECOMMISSIONING Command and send it to the datanode. + nodeManager.addDatanodeCommand(dn.getUuid(), + new SetNodeState(Time.monotonicNow(), + NodeOperationalState.DECOMMISSIONING)); + } else if (opState == NodeOperationalState.DECOMMISSIONING || opState == NodeOperationalState.DECOMMISSIONED) { LOG.info("Start Decommission called on node {} in state {}. Nothing to "+ diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java index 623b11d22d29..1e5776bff75b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; @@ -129,7 +130,8 @@ public void testBCSID() throws Exception { cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() .getContainer().getContainerSet() .getContainer(omKeyLocationInfo.getContainerID()) - .getContainerReport().getBlockCommitSequenceId(); + .getContainerReport(HddsProtos.NodeOperationalState.IN_SERVICE) + .getBlockCommitSequenceId(); Assert.assertTrue(blockCommitSequenceId > 0); // make sure the persisted block Id in OM is same as that seen in the @@ -143,6 +145,7 @@ public void testBCSID() throws Exception { cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() .getContainer().getContainerSet() .getContainer(omKeyLocationInfo.getContainerID()) - .getContainerReport().getBlockCommitSequenceId()); + .getContainerReport(HddsProtos.NodeOperationalState.IN_SERVICE) + .getBlockCommitSequenceId()); } } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java index b872516474ff..625e63c8b7bb 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java @@ -77,7 +77,7 @@ public void testRandomChoosingPolicy() throws IOException { RandomContainerDeletionChoosingPolicy.class.getName()); List pathLists = new LinkedList<>(); pathLists.add(StorageLocation.parse(containerDir.getAbsolutePath())); - containerSet = new ContainerSet(); + containerSet = new ContainerSet(null); int numContainers = 10; for (int i = 0; i < numContainers; i++) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index ed482093dfbf..cb51855cfc21 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -127,7 +127,7 @@ public static void shutdown() throws IOException { @Before public void setupPaths() throws IOException { - containerSet = new ContainerSet(); + containerSet = new ContainerSet(null); volumeSet = new VolumeSet(DATANODE_UUID, conf); blockManager = new BlockManagerImpl(conf); chunkManager = new ChunkManagerImpl(true); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java index 28da1346a5e0..c7bfaecdca83 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java @@ -103,7 +103,7 @@ public void initialize() throws IOException { // data directory conf.set("dfs.datanode.data.dir", baseDir + File.separator + "data"); - ContainerSet containerSet = new ContainerSet(); + ContainerSet containerSet = new ContainerSet(null); volumeSet = new VolumeSet(datanodeUuid, conf); StateContext context = new StateContext( conf, DatanodeStates.RUNNING, null);