Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,18 +44,23 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;


/**
* Class that manages Containers created on the datanode.
*/
public class ContainerSet {

private static final Logger LOG = LoggerFactory.getLogger(ContainerSet.class);
private final OzoneContainer ozoneContainer;

private final ConcurrentSkipListMap<Long, Container<?>> containerMap = new
ConcurrentSkipListMap<>();
private final ConcurrentSkipListSet<Long> missingContainerSet =
new ConcurrentSkipListSet<>();

public ContainerSet(OzoneContainer container) {
this.ozoneContainer = container;
}

/**
* Add Container to container map.
* @param container container to be added
Expand Down Expand Up @@ -226,8 +233,17 @@ public ContainerReportsProto getContainerReport() throws IOException {
ContainerReportsProto.Builder crBuilder =
ContainerReportsProto.newBuilder();

for (Container<?> container: containers) {
crBuilder.addReports(container.getContainerReport());
for (Container<?> container : containers) {
// To handle tests where we set this to null
// in real world there is not reason for this to be
// null at all.
if (ozoneContainer != null) {
crBuilder.addReports(container.getContainerReport(
ozoneContainer.getGlobalNodeState()));
} else {
crBuilder.addReports(container.getContainerReport(
HddsProtos.NodeOperationalState.IN_SERVICE));
}
}

return crBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;

import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.common.helpers
Expand Down Expand Up @@ -149,8 +150,10 @@ void exportContainerData(OutputStream stream,

/**
* Returns containerReport for the container.
* @param nodeState
*/
ContainerReplicaProto getContainerReport()
ContainerReplicaProto getContainerReport(
HddsProtos.NodeOperationalState nodeState)
throws StorageContainerException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
Expand Down Expand Up @@ -99,8 +100,8 @@ protected void sendICR(final Container container)
throws StorageContainerException {
IncrementalContainerReportProto icr = IncrementalContainerReportProto
.newBuilder()
.addReport(container.getContainerReport())
.build();
.addReport(container.getContainerReport(
HddsProtos.NodeOperationalState.IN_SERVICE)).build();
context.addReport(icr);
context.getParent().triggerHeartbeat();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
.DeleteContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.ReplicateContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeStateCommandHandler;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
Expand Down Expand Up @@ -126,6 +127,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
conf))
.addHandler(new ReplicateContainerCommandHandler(conf, supervisor))
.addHandler(new DeleteContainerCommandHandler())
.addHandler(new SetNodeStateCommandHandler(container))
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.commands.SetNodeState;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.Consumer;

/**
* Handles the set Node state command on the data node side.
*/
public class SetNodeStateCommandHandler implements CommandHandler {
private static final Logger LOG =
LoggerFactory.getLogger(SetNodeStateCommandHandler.class);
private final OzoneContainer ozoneContainer;

/**
* Set Node State command handler.
*
* @param ozoneContainer - Ozone Container.
*/
public SetNodeStateCommandHandler(OzoneContainer ozoneContainer) {
this.ozoneContainer = ozoneContainer;
}

/**
* Handles a given SCM command.
*
* @param command - SCM Command
* @param container - Ozone Container.
* @param context - Current Context.
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
long startTime = Time.monotonicNow();
StorageContainerDatanodeProtocolProtos.SetNodeStateCommandProto
setNodeCmdProto = null;

if (command.getType() != Type.setNodeStateCommand) {
LOG.warn("Skipping handling command, expected command "
+ "type {} but found {}",
Type.setNodeStateCommand, command.getType());
return;
}
SetNodeState setNodeCmd = (SetNodeState) command;
setNodeCmdProto = setNodeCmd.getProto();
container.setGlobalNodeState(setNodeCmdProto.getNodeState());
}

/**
* Returns the command type that this command handler handles.
*
* @return Type
*/
@Override
public StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
getCommandType() {
return null;
}

/**
* Returns number of times this handler has been invoked.
*
* @return int
*/
@Override
public int getInvocationCount() {
return 0;
}

/**
* Returns the average time this function takes to run.
*
* @return long
*/
@Override
public long getAverageRunTime() {
return 0;
}

/**
* Default implementation for updating command status.
*
* @param context
* @param command
* @param cmdStatusUpdater
* @param log
*/
@Override
public void updateCommandStatus(
StateContext context, SCMCommand command,
Consumer<CommandStatus> cmdStatusUpdater, Logger log) {

}
}
Loading