diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 2227df615637..bd7ee1f7af00 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -31,10 +31,13 @@ .ContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -91,18 +94,23 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode, .map(ContainerReplicaProto::getContainerID) .map(ContainerID::valueof).collect(Collectors.toSet()); + List containersForUpdate = + processContainerReplicas(datanodeDetails, replicas, publisher); + final Set missingReplicas = new HashSet<>(containersInSCM); missingReplicas.removeAll(containersInDn); - processContainerReplicas(datanodeDetails, replicas); processMissingReplicas(datanodeDetails, missingReplicas); - updateDeleteTransaction(datanodeDetails, replicas, publisher); + updateDeleteTransaction(datanodeDetails, containersForUpdate, publisher); /* * Update the latest set of containers for this datanode in * NodeManager */ - nodeManager.setContainers(datanodeDetails, containersInDn); + nodeManager.setContainers( + datanodeDetails, containersForUpdate.parallelStream() + .map(ContainerReplicaProto::getContainerID) + .map(ContainerID::valueof).collect(Collectors.toSet())); containerManager.notifyContainerReportProcessing(true, true); } catch (NodeNotFoundException ex) { @@ -119,21 +127,33 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode, * @param datanodeDetails Datanode from which this report was received * @param replicas list of ContainerReplicaProto */ - private void processContainerReplicas(final DatanodeDetails datanodeDetails, - final List replicas) { + private List processContainerReplicas( + final DatanodeDetails datanodeDetails, + final List replicas, + final EventPublisher publisher) { + List containers = new ArrayList<>(); for (ContainerReplicaProto replicaProto : replicas) { try { processContainerReplica(datanodeDetails, replicaProto); + containers.add(replicaProto); } catch (ContainerNotFoundException e) { - LOG.error("Received container report for an unknown container" + - " {} from datanode {}.", replicaProto.getContainerID(), - datanodeDetails, e); + LOG.info("Received container report for an unknown container" + + " {} from datanode {},delete it.", + replicaProto.getContainerID(), + datanodeDetails); + publisher.fireEvent( + SCMEvents.DATANODE_COMMAND, new CommandForDatanode<>( + datanodeDetails.getUuid(), + new DeleteContainerCommand(replicaProto.getContainerID(), + true))); + } catch (IOException e) { LOG.error("Exception while processing container report for container" + " {} from datanode {}.", replicaProto.getContainerID(), datanodeDetails, e); } } + return containers; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java index f0411d2aac58..7157dd044835 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java @@ -20,11 +20,14 @@ import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -72,6 +75,7 @@ public class SCMContainerManager implements ContainerManager { private final int numContainerPerOwnerInPipeline; private final SCMContainerManagerMetrics scmContainerManagerMetrics; + private final NodeManager nodeManager; /** * Constructs a mapping class that creates mapping between container names @@ -81,10 +85,13 @@ public class SCMContainerManager implements ContainerManager { * CacheSize is specified * in MB. * @param conf - {@link Configuration} + * @param nodeManager - NodeManager so that we can get the nodes that are + * healthy to place new containers. * @param pipelineManager - {@link PipelineManager} * @throws IOException on Failure. */ public SCMContainerManager(final Configuration conf, + final NodeManager nodeManager, PipelineManager pipelineManager) throws IOException { final File metaDir = ServerUtils.getScmDbDir(conf); @@ -108,6 +115,7 @@ public SCMContainerManager(final Configuration conf, loadExistingContainers(); scmContainerManagerMetrics = SCMContainerManagerMetrics.create(); + this.nodeManager = nodeManager; } private void loadExistingContainers() throws IOException { @@ -282,6 +290,21 @@ public ContainerInfo allocateContainer(final ReplicationType type, public void deleteContainer(ContainerID containerID) throws IOException { lock.lock(); try { + Set replicas = + containerStateManager.getContainerReplicas(containerID); + Set containerIdSet = Collections.singleton(containerID); + for (ContainerReplica replica : replicas){ + DatanodeDetails datanodeDetails = replica.getDatanodeDetails(); + try { + nodeManager.removeContainers(datanodeDetails, containerIdSet); + } catch (NodeNotFoundException nnfe){ + scmContainerManagerMetrics.incNumFailureDeleteContainers(); + throw new SCMException( + "Failed to delete container" + containerID + ", reason : " + + "DataNode " + datanodeDetails + " doesn't exist.", + SCMException.ResultCodes.FAILED_TO_FIND_NODE_IN_POOL); + } + } containerStateManager.removeContainer(containerID); final byte[] dbKey = Longs.toByteArray(containerID.getId()); final byte[] containerBytes = containerStore.get(dbKey); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index fd8bb87ceb12..5b5f656d19c0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -158,6 +158,16 @@ void setContainers(DatanodeDetails datanodeDetails, Set getContainers(DatanodeDetails datanodeDetails) throws NodeNotFoundException; + /** + * Remove set of containers from the NodeManager. + * @param datanodeDetails - DatanodeDetails + * @param containerIds - Set of containerIDs + * @throws NodeNotFoundException - if datanode is not known. For new datanode + * use addDatanodeInContainerMap call. + */ + void removeContainers(DatanodeDetails datanodeDetails, + Set containerIds) throws NodeNotFoundException; + /** * Add a {@link SCMCommand} to the command queue, which are * handled by HB thread asynchronously. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java index 954cb0e8ea46..b8bcada2b310 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -492,6 +492,20 @@ public Set getContainers(UUID uuid) return nodeStateMap.getContainers(uuid); } + /** + * remove set of containers available on a datanode. + * @param uuid - DatanodeID + * @param containerIds - Set of containerIDs + * @throws NodeNotFoundException - if datanode is not known. + */ + public void removeContainers(UUID uuid, Set containerIds) + throws NodeNotFoundException{ + for (ContainerID containerID : containerIds){ + nodeStateMap.removeContainer(uuid, containerID); + } + } + + /** * Move Stale or Dead node to healthy if we got a heartbeat from them. * Move healthy nodes to stale nodes if it is needed. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index f077e72d4465..b313471b69d6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -563,6 +563,20 @@ public Set getContainers(DatanodeDetails datanodeDetails) return nodeStateManager.getContainers(datanodeDetails.getUuid()); } + /** + * Remove set of containers available on a datanode. + * + * @param datanodeDetails - DatanodeID + * @param containerIds - Set of containerIDs + * @throws NodeNotFoundException - if datanode is not known. For new datanode + * use addDatanodeInContainerMap call. + */ + @Override + public void removeContainers(DatanodeDetails datanodeDetails, + Set containerIds) throws NodeNotFoundException { + nodeStateManager.removeContainers(datanodeDetails.getUuid(), containerIds); + } + // TODO: // Since datanode commands are added through event queue, onMessage method // should take care of adding commands to command queue. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index f48fad19e3f7..1df41a4ed2dc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -158,6 +158,13 @@ public ScmContainerLocationResponse processRequest( .setScmCloseContainerResponse(closeContainer( request.getScmCloseContainerRequest())) .build(); + case DeleteContainer: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setScmDeleteContainerResponse(deleteContainer( + request.getScmDeleteContainerRequest())) + .build(); case ListPipelines: return ScmContainerLocationResponse.newBuilder() .setCmdType(request.getCmdType()) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 7e76e6a85cd2..26592fe5551a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -406,7 +406,8 @@ private void initializeSystemManagers(OzoneConfiguration conf, if (configurator.getContainerManager() != null) { containerManager = configurator.getContainerManager(); } else { - containerManager = new SCMContainerManager(conf, pipelineManager); + containerManager = new SCMContainerManager( + conf, scmNodeManager, pipelineManager); } if (configurator.getScmBlockManager() != null) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 06dc67535720..8065a4700d45 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -333,6 +333,19 @@ public Set getContainers(DatanodeDetails uuid) { return node2ContainerMap.getContainers(uuid.getUuid()); } + /** + * Remove set of containers available on a datanode. + * + * @param datanodeDetails - DatanodeID + * @param containerIds - Set of containerIDs + * @throws NodeNotFoundException - if datanode is not known. For new datanode + * use addDatanodeInContainerMap call. + */ + public void removeContainers(DatanodeDetails datanodeDetails, + Set containerIds) throws NodeNotFoundException{ + + } + // Returns the number of commands that is queued to this node manager. public int getCommandCount(DatanodeDetails dd) { List list = commandMap.get(dd.getUuid()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java index 612bf5dd99df..72c01661c3a4 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java @@ -76,7 +76,8 @@ public static void setUp() throws Exception { pipelineManager.getStateManager(), configuration); pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, mockRatisProvider); - containerManager = new SCMContainerManager(configuration, pipelineManager); + containerManager = new SCMContainerManager( + configuration, nodeManager, pipelineManager); pipelineManager.triggerPipelineCreation(); eventQueue.addHandler(CLOSE_CONTAINER, new CloseContainerEventHandler(pipelineManager, containerManager)); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java index 405c582d32bb..49d8644d7066 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java @@ -97,7 +97,8 @@ public static void setUp() throws Exception { nodeManager = new MockNodeManager(true, 10); pipelineManager = new SCMPipelineManager(conf, nodeManager, new EventQueue()); - containerManager = new SCMContainerManager(conf, pipelineManager); + containerManager = new SCMContainerManager( + conf, nodeManager, pipelineManager); xceiverClientManager = new XceiverClientManager(conf); replicationFactor = SCMTestUtils.getReplicationFactor(conf); replicationType = SCMTestUtils.getReplicationType(conf); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index 39058234a89f..2aff9c01cdb3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -109,7 +109,7 @@ SCMContainerManager createContainerManager(Configuration config, OZONE_SCM_DB_CACHE_SIZE_DEFAULT); PipelineManager pipelineManager = new SCMPipelineManager(config, scmNodeManager, eventQueue); - return new SCMContainerManager(config, pipelineManager); + return new SCMContainerManager(config, scmNodeManager, pipelineManager); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 0ecff3f541a7..f307c3d728a3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -213,6 +213,20 @@ public Set getContainers(DatanodeDetails uuid) { throw new UnsupportedOperationException("Not yet implemented"); } + /** + * Remove set of containers available on a datanode. + * + * @param datanodeDetails - DatanodeID + * @param containerIds - Set of containerIDs + * @throws NodeNotFoundException - if datanode is not known. For new datanode + * use addDatanodeInContainerMap call. + */ + @Override + public void removeContainers(DatanodeDetails datanodeDetails, + Set containerIds) throws NodeNotFoundException { + throw new UnsupportedOperationException("Not yet implemented"); + } + /** * Closes this stream and releases any system resources associated * with it. If the stream is already closed then invoking this