Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@
.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -91,18 +94,23 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
.map(ContainerReplicaProto::getContainerID)
.map(ContainerID::valueof).collect(Collectors.toSet());

List<ContainerReplicaProto> containersForUpdate =
processContainerReplicas(datanodeDetails, replicas, publisher);

final Set<ContainerID> missingReplicas = new HashSet<>(containersInSCM);
missingReplicas.removeAll(containersInDn);

processContainerReplicas(datanodeDetails, replicas);
processMissingReplicas(datanodeDetails, missingReplicas);
updateDeleteTransaction(datanodeDetails, replicas, publisher);
updateDeleteTransaction(datanodeDetails, containersForUpdate, publisher);

/*
* Update the latest set of containers for this datanode in
* NodeManager
*/
nodeManager.setContainers(datanodeDetails, containersInDn);
nodeManager.setContainers(
datanodeDetails, containersForUpdate.parallelStream()
.map(ContainerReplicaProto::getContainerID)
.map(ContainerID::valueof).collect(Collectors.toSet()));

containerManager.notifyContainerReportProcessing(true, true);
} catch (NodeNotFoundException ex) {
Expand All @@ -119,21 +127,33 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
* @param datanodeDetails Datanode from which this report was received
* @param replicas list of ContainerReplicaProto
*/
private void processContainerReplicas(final DatanodeDetails datanodeDetails,
final List<ContainerReplicaProto> replicas) {
private List<ContainerReplicaProto> processContainerReplicas(
final DatanodeDetails datanodeDetails,
final List<ContainerReplicaProto> replicas,
final EventPublisher publisher) {
List<ContainerReplicaProto> containers = new ArrayList<>();
for (ContainerReplicaProto replicaProto : replicas) {
try {
processContainerReplica(datanodeDetails, replicaProto);
containers.add(replicaProto);
} catch (ContainerNotFoundException e) {
LOG.error("Received container report for an unknown container" +
" {} from datanode {}.", replicaProto.getContainerID(),
datanodeDetails, e);
LOG.info("Received container report for an unknown container" +
" {} from datanode {},delete it.",
replicaProto.getContainerID(),
datanodeDetails);
publisher.fireEvent(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we fireEvent upon receive the scmcli request? Can we add some high level design description wrt the flow of the delete in the PR description? That will help the reviewers.

Copy link
Contributor

@avijayanhwx avijayanhwx Jan 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for @xiaoyuyao's description request. The delete container flow is quite important for Recon to work correctly and hence trying to understand this better. Will there be a case where a user will need to delete a container directly using SCM?

SCMEvents.DATANODE_COMMAND, new CommandForDatanode<>(
datanodeDetails.getUuid(),
new DeleteContainerCommand(replicaProto.getContainerID(),
true)));

} catch (IOException e) {
LOG.error("Exception while processing container report for container" +
" {} from datanode {}.", replicaProto.getContainerID(),
datanodeDetails, e);
}
}
return containers;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand Down Expand Up @@ -72,6 +75,7 @@ public class SCMContainerManager implements ContainerManager {
private final int numContainerPerOwnerInPipeline;

private final SCMContainerManagerMetrics scmContainerManagerMetrics;
private final NodeManager nodeManager;

/**
* Constructs a mapping class that creates mapping between container names
Expand All @@ -81,10 +85,13 @@ public class SCMContainerManager implements ContainerManager {
* CacheSize is specified
* in MB.
* @param conf - {@link Configuration}
* @param nodeManager - NodeManager so that we can get the nodes that are
* healthy to place new containers.
* @param pipelineManager - {@link PipelineManager}
* @throws IOException on Failure.
*/
public SCMContainerManager(final Configuration conf,
final NodeManager nodeManager,
PipelineManager pipelineManager) throws IOException {

final File metaDir = ServerUtils.getScmDbDir(conf);
Expand All @@ -108,6 +115,7 @@ public SCMContainerManager(final Configuration conf,
loadExistingContainers();

scmContainerManagerMetrics = SCMContainerManagerMetrics.create();
this.nodeManager = nodeManager;
}

private void loadExistingContainers() throws IOException {
Expand Down Expand Up @@ -282,6 +290,21 @@ public ContainerInfo allocateContainer(final ReplicationType type,
public void deleteContainer(ContainerID containerID) throws IOException {
lock.lock();
try {
Set<ContainerReplica> replicas =
containerStateManager.getContainerReplicas(containerID);
Set containerIdSet = Collections.singleton(containerID);
for (ContainerReplica replica : replicas){
DatanodeDetails datanodeDetails = replica.getDatanodeDetails();
try {
nodeManager.removeContainers(datanodeDetails, containerIdSet);
} catch (NodeNotFoundException nnfe){
scmContainerManagerMetrics.incNumFailureDeleteContainers();
throw new SCMException(
"Failed to delete container" + containerID + ", reason : " +
"DataNode " + datanodeDetails + " doesn't exist.",
SCMException.ResultCodes.FAILED_TO_FIND_NODE_IN_POOL);
}
}
containerStateManager.removeContainer(containerID);
final byte[] dbKey = Longs.toByteArray(containerID.getId());
final byte[] containerBytes = containerStore.get(dbKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ void setContainers(DatanodeDetails datanodeDetails,
Set<ContainerID> getContainers(DatanodeDetails datanodeDetails)
throws NodeNotFoundException;

/**
* Remove set of containers from the NodeManager.
* @param datanodeDetails - DatanodeDetails
* @param containerIds - Set of containerIDs
* @throws NodeNotFoundException - if datanode is not known. For new datanode
* use addDatanodeInContainerMap call.
*/
void removeContainers(DatanodeDetails datanodeDetails,
Set<ContainerID> containerIds) throws NodeNotFoundException;

/**
* Add a {@link SCMCommand} to the command queue, which are
* handled by HB thread asynchronously.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,20 @@ public Set<ContainerID> getContainers(UUID uuid)
return nodeStateMap.getContainers(uuid);
}

/**
* remove set of containers available on a datanode.
* @param uuid - DatanodeID
* @param containerIds - Set of containerIDs
* @throws NodeNotFoundException - if datanode is not known.
*/
public void removeContainers(UUID uuid, Set<ContainerID> containerIds)
throws NodeNotFoundException{
for (ContainerID containerID : containerIds){
nodeStateMap.removeContainer(uuid, containerID);
}
}


/**
* Move Stale or Dead node to healthy if we got a heartbeat from them.
* Move healthy nodes to stale nodes if it is needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,20 @@ public Set<ContainerID> getContainers(DatanodeDetails datanodeDetails)
return nodeStateManager.getContainers(datanodeDetails.getUuid());
}

/**
* Remove set of containers available on a datanode.
*
* @param datanodeDetails - DatanodeID
* @param containerIds - Set of containerIDs
* @throws NodeNotFoundException - if datanode is not known. For new datanode
* use addDatanodeInContainerMap call.
*/
@Override
public void removeContainers(DatanodeDetails datanodeDetails,
Set<ContainerID> containerIds) throws NodeNotFoundException {
nodeStateManager.removeContainers(datanodeDetails.getUuid(), containerIds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to remove the container from the pipelineContainerMap in SCMPipelineManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before delete a container, we should close it; When we close a container , The function SCMContainerManager#updateContainerState will remove the container from pipelineContainerMap in SCMPipelineManager.

}

// TODO:
// Since datanode commands are added through event queue, onMessage method
// should take care of adding commands to command queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ public ScmContainerLocationResponse processRequest(
.setScmCloseContainerResponse(closeContainer(
request.getScmCloseContainerRequest()))
.build();
case DeleteContainer:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setScmDeleteContainerResponse(deleteContainer(
request.getScmDeleteContainerRequest()))
.build();
case ListPipelines:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ private void initializeSystemManagers(OzoneConfiguration conf,
if (configurator.getContainerManager() != null) {
containerManager = configurator.getContainerManager();
} else {
containerManager = new SCMContainerManager(conf, pipelineManager);
containerManager = new SCMContainerManager(
conf, scmNodeManager, pipelineManager);
}

if (configurator.getScmBlockManager() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,19 @@ public Set<ContainerID> getContainers(DatanodeDetails uuid) {
return node2ContainerMap.getContainers(uuid.getUuid());
}

/**
* Remove set of containers available on a datanode.
*
* @param datanodeDetails - DatanodeID
* @param containerIds - Set of containerIDs
* @throws NodeNotFoundException - if datanode is not known. For new datanode
* use addDatanodeInContainerMap call.
*/
public void removeContainers(DatanodeDetails datanodeDetails,
Set<ContainerID> containerIds) throws NodeNotFoundException{

}

// Returns the number of commands that is queued to this node manager.
public int getCommandCount(DatanodeDetails dd) {
List<SCMCommand> list = commandMap.get(dd.getUuid());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public static void setUp() throws Exception {
pipelineManager.getStateManager(), configuration);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
containerManager = new SCMContainerManager(configuration, pipelineManager);
containerManager = new SCMContainerManager(
configuration, nodeManager, pipelineManager);
pipelineManager.triggerPipelineCreation();
eventQueue.addHandler(CLOSE_CONTAINER,
new CloseContainerEventHandler(pipelineManager, containerManager));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public static void setUp() throws Exception {
nodeManager = new MockNodeManager(true, 10);
pipelineManager =
new SCMPipelineManager(conf, nodeManager, new EventQueue());
containerManager = new SCMContainerManager(conf, pipelineManager);
containerManager = new SCMContainerManager(
conf, nodeManager, pipelineManager);
xceiverClientManager = new XceiverClientManager(conf);
replicationFactor = SCMTestUtils.getReplicationFactor(conf);
replicationType = SCMTestUtils.getReplicationType(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ SCMContainerManager createContainerManager(Configuration config,
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
PipelineManager pipelineManager =
new SCMPipelineManager(config, scmNodeManager, eventQueue);
return new SCMContainerManager(config, pipelineManager);
return new SCMContainerManager(config, scmNodeManager, pipelineManager);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,20 @@ public Set<ContainerID> getContainers(DatanodeDetails uuid) {
throw new UnsupportedOperationException("Not yet implemented");
}

/**
* Remove set of containers available on a datanode.
*
* @param datanodeDetails - DatanodeID
* @param containerIds - Set of containerIDs
* @throws NodeNotFoundException - if datanode is not known. For new datanode
* use addDatanodeInContainerMap call.
*/
@Override
public void removeContainers(DatanodeDetails datanodeDetails,
Set<ContainerID> containerIds) throws NodeNotFoundException {
throw new UnsupportedOperationException("Not yet implemented");
}

/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
Expand Down