From a06b7a4d3d26f1d70b77d56d88726c40233b3d30 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sun, 2 Mar 2025 18:44:50 +0800 Subject: [PATCH 01/16] Add node state change notification for ReplicationManager --- .../replication/ReplicationManager.java | 26 ++++++++ .../replication/ReplicationQueue.java | 4 ++ .../replication/TestReplicationManager.java | 61 +++++++++++++++++++ 3 files changed, 91 insertions(+) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index 0dee75f559e5..e6320e3b4254 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -896,6 +896,10 @@ public ReplicationManagerReport getContainerReport() { return containerReport; } + public boolean isThreadWaiting() { + return replicationMonitor.getState() == Thread.State.TIMED_WAITING; + } + /** * ReplicationMonitor thread runnable. This wakes up at configured * interval and processes all the containers in the system. @@ -1470,5 +1474,27 @@ public boolean hasHealthyPipeline(ContainerInfo container) { return false; } } + + /** + * Notify the ReplicationManager that a node state has changed, which might + * require container replication. This will wake up the replication monitor + * thread if it's sleeping and there's no active replication work in progress. + */ + public synchronized void notifyNodeStateChange() { + if (!running || serviceStatus == ServiceStatus.PAUSING) { + return; + } + + // Only wake up the thread if there's no active replication work + // This prevents creating a new replication queue over and over + // when multiple nodes change state in quick succession + if (getQueue().isEmpty()) { + LOG.info("Waking up replication monitor due to node state change"); + // Notify the replication monitor thread to wake up + notify(); + } else { + LOG.info("Replication queue is not empty, not waking up replication monitor"); + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java index 9e17d6747936..ee869515ced2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java @@ -97,4 +97,8 @@ public int overReplicatedQueueSize() { return overRepQueue.size(); } + public boolean isEmpty() { + return underRepQueue.isEmpty() && overRepQueue.isEmpty(); + } + } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index e0a4130021d1..7b671e6c2144 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -63,6 +63,8 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.client.ECReplicationConfig; @@ -99,6 +101,7 @@ import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Lists; +import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.TestClock; import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.junit.jupiter.api.AfterEach; @@ -106,6 +109,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; /** @@ -1768,6 +1772,63 @@ public void testPendingOpExpiry() throws ContainerNotFoundException { assertNotEquals(commandDeadline, sentCommand.getRight().getDeadline()); } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testNotifyNodeStateChangeWakesUpThread(boolean queueIsEmpty) + throws IOException, InterruptedException, ReflectiveOperationException, TimeoutException { + + AtomicBoolean processAllCalled = new AtomicBoolean(false); + ReplicationQueue queue = mock(ReplicationQueue.class); + when(queue.isEmpty()).thenReturn(queueIsEmpty); + final ReplicationManager customRM = new ReplicationManager( + configuration, + containerManager, + ratisPlacementPolicy, + ecPlacementPolicy, + eventPublisher, + scmContext, + nodeManager, + clock, + containerReplicaPendingOps) { + @Override + public ReplicationQueue getQueue() { + return queue; + } + + @Override + public synchronized void processAll() { + processAllCalled.set(true); + } + }; + + customRM.notifyStatusChanged(); + customRM.start(); + + // wait for the thread become TIMED_WAITING + GenericTestUtils.waitFor( + () -> customRM.isThreadWaiting(), + 100, + 1000); + + // The processAll method will be called when the ReplicationManager's run + // method is executed by the replicationMonitor thread. + assertTrue(processAllCalled.get()); + processAllCalled.set(false); + + customRM.notifyNodeStateChange(); + + GenericTestUtils.waitFor( + () -> customRM.isThreadWaiting(), + 100, + 1000); + + // If the queue is empty, the processAll method should have been called + assertEquals(processAllCalled.get(), queueIsEmpty); + + customRM.stop(); + } + @SafeVarargs private final Set addReplicas(ContainerInfo container, ContainerReplicaProto.State replicaState, From 27ccf552e598b39ffbbf0b2806de4641a1ae4ee4 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sun, 2 Mar 2025 20:40:15 +0800 Subject: [PATCH 02/16] Node state change in NodeManager should notify ReplicationManager --- .../hadoop/hdds/scm/node/NodeManager.java | 7 +++ .../hadoop/hdds/scm/node/SCMNodeManager.java | 19 ++++++ .../scm/server/StorageContainerManager.java | 1 + .../hdds/scm/container/MockNodeManager.java | 5 ++ .../scm/container/SimpleMockNodeManager.java | 5 ++ .../hdds/scm/node/TestSCMNodeManager.java | 60 +++++++++++++++++++ .../testutils/ReplicationNodeManagerMock.java | 5 ++ 7 files changed, 102 insertions(+) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index e6a74b395f77..a75adfe48673 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -74,6 +75,12 @@ public interface NodeManager extends StorageContainerNodeProtocol, EventHandler, NodeManagerMXBean, Closeable { + /** + * TODO: remove cyclic dependency between NodeManager and ReplicationManager + * Set the ReplicationManager + * @param rm ReplicationManager instance + */ + void setReplicationManager(ReplicationManager rm); /** * Register API without a layout version info object passed in. Useful for diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 43d13e4ae6de..60061784ddb9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.net.NetworkTopology; @@ -134,6 +135,7 @@ public class SCMNodeManager implements NodeManager { private final SCMContext scmContext; private final Map>> sendCommandNotifyMap; + private volatile ReplicationManager replicationManager; /** * Lock used to synchronize some operation in Node manager to ensure a @@ -295,6 +297,10 @@ public NodeStatus getNodeStatus(DatanodeDetails datanodeDetails) return nodeStateManager.getNodeStatus(datanodeDetails); } + public void setReplicationManager(ReplicationManager rm) { + replicationManager = rm; + } + /** * Set the operation state of a node. * @param datanodeDetails The datanode to set the new state for @@ -318,8 +324,21 @@ public void setNodeOperationalState(DatanodeDetails datanodeDetails, public void setNodeOperationalState(DatanodeDetails datanodeDetails, NodeOperationalState newState, long opStateExpiryEpocSec) throws NodeNotFoundException { + NodeOperationalState oldState = getNodeStatus(datanodeDetails).getOperationalState(); + nodeStateManager.setNodeOperationalState( datanodeDetails, newState, opStateExpiryEpocSec); + + if (replicationManager != null && oldState != newState) { + // Notify when a node is entering maintenance, decommissioning or back to service + if (newState == NodeOperationalState.ENTERING_MAINTENANCE + || newState == NodeOperationalState.DECOMMISSIONING + || newState == NodeOperationalState.IN_SERVICE) { + LOG.info("Notifying ReplicationManager of node state change for {}: {} -> {}", + datanodeDetails, oldState, newState); + replicationManager.notifyNodeStateChange(); + } + } } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index cd8da86f7e4c..d66b16072c5a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -818,6 +818,7 @@ private void initializeSystemManagers(OzoneConfiguration conf, containerReplicaPendingOps); reconfigurationHandler.register(replicationManager.getConfig()); } + scmNodeManager.setReplicationManager(replicationManager); serviceManager.register(replicationManager); // RM gets notified of expired pending delete from containerReplicaPendingOps by subscribing to it // so it can resend them. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 244f86e79540..648a86574ed7 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.net.NetConstants; import org.apache.hadoop.hdds.scm.net.NetworkTopology; @@ -238,6 +239,10 @@ private void populateNodeMetric(DatanodeDetails datanodeDetails, int x) { } + @Override + public void setReplicationManager(ReplicationManager rm) { + } + /** * Gets all Live Datanodes that is currently communicating with SCM. * diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java index ea1054784d09..234eb69dac71 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; @@ -94,6 +95,10 @@ public void setPipelines(DatanodeDetails dd, int count) { pipelineMap.put(dd.getUuid(), pipelines); } + @Override + public void setReplicationManager(ReplicationManager rm) { + } + /** * If the given node was registered with the nodeManager, return the * NodeStatus for the node. Otherwise return a NodeStatus of "In Service diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index 25802ddb8131..16acbbdaa9e1 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -86,6 +86,7 @@ import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.net.NetworkTopology; @@ -2038,4 +2039,63 @@ public void testScmRegisterNodeWithUpdatedIpAndHostname() assertEquals(emptyList(), nodeManager.getNodesByAddress(ipAddress)); } } + + private static Stream nodeStateTransitions() { + return Stream.of( + // start decommissioning or entering maintenance + Arguments.of(HddsProtos.NodeOperationalState.IN_SERVICE, + HddsProtos.NodeOperationalState.DECOMMISSIONING, true), + Arguments.of(HddsProtos.NodeOperationalState.IN_SERVICE, + HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, true), + // back to service (DataNodeAdminMonitor abort workflow, maintenance end time expired or node is dead) + Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONING, + HddsProtos.NodeOperationalState.IN_SERVICE, true), + Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONED, + HddsProtos.NodeOperationalState.IN_SERVICE, true), + Arguments.of(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, + HddsProtos.NodeOperationalState.IN_SERVICE, true), + Arguments.of(HddsProtos.NodeOperationalState.IN_MAINTENANCE, + HddsProtos.NodeOperationalState.IN_SERVICE, true), + // there is no under/over replicated containers on the node, completed the admin workflow + Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONING, + HddsProtos.NodeOperationalState.DECOMMISSIONED, false), + Arguments.of(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, + HddsProtos.NodeOperationalState.IN_MAINTENANCE, false) + ); + } + + @ParameterizedTest + @MethodSource("nodeStateTransitions") + public void testNodeOperationalStateChangeNotifiesReplicationManager( + HddsProtos.NodeOperationalState oldState, + HddsProtos.NodeOperationalState newState, + boolean shouldNotify) + throws IOException, NodeNotFoundException, AuthenticationException { + // Create a configuration + OzoneConfiguration conf = getConf(); + + // Create a mocked ReplicationManager + ReplicationManager replicationManager = mock(ReplicationManager.class); + + // Create the SCMNodeManager + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + nodeManager.setReplicationManager(replicationManager); + + // Create a test datanode and register it + DatanodeDetails datanode = HddsTestUtils.createRandomDatanodeAndRegister( + nodeManager); + + // Set the initial operational state + nodeManager.setNodeOperationalState(datanode, oldState, 0); + + // Reset the mock to clear any previous calls + org.mockito.Mockito.reset(replicationManager); + + // Call setNodeOperationalState to simulate a node state change + nodeManager.setNodeOperationalState(datanode, newState, 0); + + // Verify the notification was sent to ReplicationManager based on shouldNotify flag + verify(replicationManager, times(shouldNotify ? 1 : 0)).notifyNodeStateChange(); + } + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 6eb7d7c943f5..30ae71b7816c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.node.CommandQueue; import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; @@ -69,6 +70,10 @@ public ReplicationNodeManagerMock(Map nodeStatus, this.commandQueue = commandQueue; } + @Override + public void setReplicationManager(ReplicationManager rm) { + } + /** * Get the number of data nodes that in all states. * From caaa2200c3eff6ed1786177db53e3e6e11039ef5 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sun, 2 Mar 2025 21:43:17 +0800 Subject: [PATCH 03/16] Notify ReplicationManager when a node becomes dead --- .../hadoop/hdds/scm/node/DeadNodeHandler.java | 23 +++++++++++++++++-- .../hadoop/hdds/scm/node/NodeManager.java | 2 +- .../scm/server/StorageContainerManager.java | 2 +- .../replication/TestReplicationManager.java | 1 - .../hdds/scm/node/TestDeadNodeHandler.java | 12 +++++++--- 5 files changed, 32 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index 20dc5aea7861..7971a852beae 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; @@ -51,23 +52,34 @@ public class DeadNodeHandler implements EventHandler { private final ContainerManager containerManager; @Nullable private final DeletedBlockLog deletedBlockLog; + @Nullable + private final ReplicationManager replicationManager; private static final Logger LOG = LoggerFactory.getLogger(DeadNodeHandler.class); + public DeadNodeHandler(final NodeManager nodeManager, + final PipelineManager pipelineManager, + final ContainerManager containerManager) { + this(nodeManager, pipelineManager, containerManager, null, null); + } + public DeadNodeHandler(final NodeManager nodeManager, final PipelineManager pipelineManager, - final ContainerManager containerManager) { - this(nodeManager, pipelineManager, containerManager, null); + final ContainerManager containerManager, + final ReplicationManager replicationManager) { + this(nodeManager, pipelineManager, containerManager, replicationManager, null); } public DeadNodeHandler(final NodeManager nodeManager, final PipelineManager pipelineManager, final ContainerManager containerManager, + @Nullable final ReplicationManager replicationManager, @Nullable final DeletedBlockLog deletedBlockLog) { this.nodeManager = nodeManager; this.pipelineManager = pipelineManager; this.containerManager = containerManager; + this.replicationManager = replicationManager; this.deletedBlockLog = deletedBlockLog; } @@ -97,6 +109,13 @@ public void onMessage(final DatanodeDetails datanodeDetails, removeContainerReplicas(datanodeDetails); } + // Wake up ReplicationManager + if (replicationManager != null) { + LOG.info("Notifying ReplicationManager about dead node: {}", + datanodeDetails); + replicationManager.notifyNodeStateChange(); + } + // remove commands in command queue for the DN final List cmdList = nodeManager.getCommandQueue( datanodeDetails.getUuid()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index a75adfe48673..b831c9e0361b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -76,7 +76,7 @@ public interface NodeManager extends StorageContainerNodeProtocol, EventHandler, NodeManagerMXBean, Closeable { /** - * TODO: remove cyclic dependency between NodeManager and ReplicationManager + * TODO: remove cyclic dependency between NodeManager and ReplicationManager. * Set the ReplicationManager * @param rm ReplicationManager instance */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index d66b16072c5a..094de80e6a3d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -474,7 +474,7 @@ private void initializeEventHandlers() { StaleNodeHandler staleNodeHandler = new StaleNodeHandler(scmNodeManager, pipelineManager); DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager, - pipelineManager, containerManager); + pipelineManager, containerManager, replicationManager); StartDatanodeAdminHandler datanodeStartAdminHandler = new StartDatanodeAdminHandler(scmNodeManager, pipelineManager); ReadOnlyHealthyToHealthyNodeHandler readOnlyHealthyToHealthyNodeHandler = diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index 7b671e6c2144..dc75e1864bbf 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -1772,7 +1772,6 @@ public void testPendingOpExpiry() throws ContainerNotFoundException { assertNotEquals(commandDeadline, sentCommand.getRight().getDeadline()); } - @ParameterizedTest @ValueSource(booleans = {true, false}) public void testNotifyNodeStateChangeWakesUpThread(boolean queueIsEmpty) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 46524d49b443..173a88c965e5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; @@ -81,6 +82,7 @@ public class TestDeadNodeHandler { private ContainerManager containerManager; private PipelineManagerImpl pipelineManager; private DeadNodeHandler deadNodeHandler; + private ReplicationManager replicationManager; private HealthyReadOnlyNodeHandler healthyReadOnlyNodeHandler; private EventPublisher publisher; private EventQueue eventQueue; @@ -113,9 +115,10 @@ public void setup() throws IOException, AuthenticationException { pipelineManager.setPipelineProvider(RATIS, mockRatisProvider); containerManager = scm.getContainerManager(); + replicationManager = mock(ReplicationManager.class); deletedBlockLog = mock(DeletedBlockLog.class); deadNodeHandler = new DeadNodeHandler(nodeManager, - mock(PipelineManager.class), containerManager, deletedBlockLog); + mock(PipelineManager.class), containerManager, replicationManager, deletedBlockLog); healthyReadOnlyNodeHandler = new HealthyReadOnlyNodeHandler(nodeManager, pipelineManager); @@ -230,6 +233,8 @@ public void testOnMessage(@TempDir File tempDir) throws Exception { assertFalse( nodeManager.getClusterNetworkTopologyMap().contains(datanode1)); + verify(replicationManager, times(1)).notifyNodeStateChange(); + verify(deletedBlockLog, times(0)) .onDatanodeDead(datanode1.getUuid()); @@ -259,8 +264,9 @@ public void testOnMessage(@TempDir File tempDir) throws Exception { nodeManager.getClusterNetworkTopologyMap().contains(datanode1)); assertEquals(0, nodeManager.getCommandQueueCount(datanode1.getUuid(), cmd.getType())); - verify(deletedBlockLog, times(1)) - .onDatanodeDead(datanode1.getUuid()); + verify(replicationManager).notifyNodeStateChange(); + + verify(deletedBlockLog).onDatanodeDead(datanode1.getUuid()); container1Replicas = containerManager .getContainerReplicas(ContainerID.valueOf(container1.getContainerID())); From 431c717897892be5c1a741f2c5b89b65241fec11 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sun, 2 Mar 2025 23:22:13 +0800 Subject: [PATCH 04/16] fix pmd --- .../java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 60061784ddb9..12e776d2705d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -297,6 +297,7 @@ public NodeStatus getNodeStatus(DatanodeDetails datanodeDetails) return nodeStateManager.getNodeStatus(datanodeDetails); } + @Override public void setReplicationManager(ReplicationManager rm) { replicationManager = rm; } From bc7aa2ff58846172a2a716b29eabe4412b1ba59d Mon Sep 17 00:00:00 2001 From: peterxcli Date: Mon, 3 Mar 2025 01:00:29 +0800 Subject: [PATCH 05/16] fix tests --- .../replication/ReplicationManager.java | 7 ++++++- .../hdds/scm/node/TestDeadNodeHandler.java | 6 +++++- .../hdds/scm/node/TestSCMNodeManager.java | 21 ++++--------------- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index e6320e3b4254..397d729a802e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -114,7 +114,7 @@ public class ReplicationManager implements SCMService, ContainerReplicaPendingOp /** * SCMContext from StorageContainerManager. */ - private final SCMContext scmContext; + private SCMContext scmContext; /** @@ -1392,6 +1392,11 @@ public boolean shouldRun() { } } + @VisibleForTesting + public void setScmContext(SCMContext context) { + scmContext = context; + } + @Override public String getServiceName() { return ReplicationManager.class.getSimpleName(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 173a88c965e5..822d13f61acd 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -108,6 +109,8 @@ public void setup() throws IOException, AuthenticationException { .setSCM(scm).build(); pipelineManager = (PipelineManagerImpl)scm.getPipelineManager(); + replicationManager = scm.getReplicationManager(); + replicationManager.setScmContext(scmContext); pipelineManager.setScmContext(scmContext); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, @@ -233,7 +236,8 @@ public void testOnMessage(@TempDir File tempDir) throws Exception { assertFalse( nodeManager.getClusterNetworkTopologyMap().contains(datanode1)); - verify(replicationManager, times(1)).notifyNodeStateChange(); + verify(replicationManager).notifyNodeStateChange(); + clearInvocations(replicationManager); verify(deletedBlockLog, times(0)) .onDatanodeDead(datanode1.getUuid()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index 16acbbdaa9e1..153dc26fa580 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -2071,30 +2071,17 @@ public void testNodeOperationalStateChangeNotifiesReplicationManager( HddsProtos.NodeOperationalState newState, boolean shouldNotify) throws IOException, NodeNotFoundException, AuthenticationException { - // Create a configuration OzoneConfiguration conf = getConf(); - - // Create a mocked ReplicationManager ReplicationManager replicationManager = mock(ReplicationManager.class); - - // Create the SCMNodeManager + try (SCMNodeManager nodeManager = createNodeManager(conf)) { nodeManager.setReplicationManager(replicationManager); - - // Create a test datanode and register it + DatanodeDetails datanode = HddsTestUtils.createRandomDatanodeAndRegister( nodeManager); - - // Set the initial operational state - nodeManager.setNodeOperationalState(datanode, oldState, 0); - - // Reset the mock to clear any previous calls - org.mockito.Mockito.reset(replicationManager); - - // Call setNodeOperationalState to simulate a node state change + nodeManager.getNodeStateManager().setNodeOperationalState(datanode, oldState); + nodeManager.setNodeOperationalState(datanode, newState, 0); - - // Verify the notification was sent to ReplicationManager based on shouldNotify flag verify(replicationManager, times(shouldNotify ? 1 : 0)).notifyNodeStateChange(); } } From 3c9f8383e1a6c27da5d07e334e007e494ff02245 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sat, 8 Mar 2025 22:38:57 +0800 Subject: [PATCH 06/16] Introduce ReplicationManagerEventHandler to decouple NM and RM --- .../ReplicationManagerEventHandler.java | 44 ++++++++++++++++ .../hadoop/hdds/scm/events/SCMEvents.java | 4 ++ .../TestReplicationManagerEventHandler.java | 52 +++++++++++++++++++ 3 files changed, 100 insertions(+) create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java create mode 100644 hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerEventHandler.java diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java new file mode 100644 index 000000000000..06f01b4bc395 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.replication; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles events related to the ReplicationManager. + */ +public class ReplicationManagerEventHandler implements EventHandler { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationManagerEventHandler.class); + + private final ReplicationManager replicationManager; + + public ReplicationManagerEventHandler(ReplicationManager replicationManager) { + this.replicationManager = replicationManager; + } + + @Override + public void onMessage(DatanodeDetails datanodeDetails, EventPublisher eventPublisher) { + LOG.info("ReplicationManagerEventHandler received event for datanode: {}", datanodeDetails); + replicationManager.notifyNodeStateChange(); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index b72f786acd10..797a6dfd613e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -212,6 +212,10 @@ public final class SCMEvents { new TypedEvent<>(CommandStatusReportHandler.DeleteBlockStatus.class, "Delete_Block_Status"); + public static final TypedEvent + REPLICATION_MANAGER_NOTIFY = + new TypedEvent<>(DatanodeDetails.class, "Replication_Manager_Notify"); + /** * Private Ctor. Never Constructed. */ diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerEventHandler.java new file mode 100644 index 000000000000..239816023e28 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerEventHandler.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hdds.scm.container.replication; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Test the ReplicationManagerEventHandler class. + */ +public class TestReplicationManagerEventHandler { + private ReplicationManager replicationManager; + private ReplicationManagerEventHandler replicationManagerEventHandler; + private EventPublisher publisher; + + @BeforeEach + public void setUp() { + replicationManager = mock(ReplicationManager.class); + publisher = mock(EventPublisher.class); + replicationManagerEventHandler = new ReplicationManagerEventHandler(replicationManager); + } + + @Test + public void testReplicationManagerEventHandler() { + DatanodeDetails dataNodeDetails = MockDatanodeDetails.randomDatanodeDetails(); + replicationManagerEventHandler.onMessage(dataNodeDetails, publisher); + + verify(replicationManager).notifyNodeStateChange(); + } +} From c969709bfb2fa8ad6f09a918a2bedba49b007cbd Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sat, 8 Mar 2025 22:43:31 +0800 Subject: [PATCH 07/16] Change notify call to RM to publish RM_NOTIFY event to queue --- .../replication/ReplicationManager.java | 8 +++-- .../hadoop/hdds/scm/node/DeadNodeHandler.java | 14 ++++----- .../hadoop/hdds/scm/node/SCMNodeManager.java | 5 +-- .../replication/TestReplicationManager.java | 2 +- .../hdds/scm/node/TestSCMNodeManager.java | 31 +++++++++++-------- 5 files changed, 34 insertions(+), 26 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index 397d729a802e..046f12dc6e31 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -1484,10 +1484,12 @@ public boolean hasHealthyPipeline(ContainerInfo container) { * Notify the ReplicationManager that a node state has changed, which might * require container replication. This will wake up the replication monitor * thread if it's sleeping and there's no active replication work in progress. + * + * @return true if the replication monitor was woken up, false otherwise */ - public synchronized void notifyNodeStateChange() { + public synchronized boolean notifyNodeStateChange() { if (!running || serviceStatus == ServiceStatus.PAUSING) { - return; + return false; } // Only wake up the thread if there's no active replication work @@ -1497,8 +1499,10 @@ public synchronized void notifyNodeStateChange() { LOG.info("Waking up replication monitor due to node state change"); // Notify the replication monitor thread to wake up notify(); + return true; } else { LOG.info("Replication queue is not empty, not waking up replication monitor"); + return false; } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index 7971a852beae..96176c302399 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; @@ -108,13 +108,11 @@ public void onMessage(final DatanodeDetails datanodeDetails, if (!nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) { removeContainerReplicas(datanodeDetails); } - - // Wake up ReplicationManager - if (replicationManager != null) { - LOG.info("Notifying ReplicationManager about dead node: {}", - datanodeDetails); - replicationManager.notifyNodeStateChange(); - } + + // Notify ReplicationManager + LOG.info("Notifying ReplicationManager about dead node: {}", + datanodeDetails); + publisher.fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanodeDetails); // remove commands in command queue for the DN final List cmdList = nodeManager.getCommandQueue( diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 12e776d2705d..75f7aeb99017 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -330,14 +330,15 @@ public void setNodeOperationalState(DatanodeDetails datanodeDetails, nodeStateManager.setNodeOperationalState( datanodeDetails, newState, opStateExpiryEpocSec); - if (replicationManager != null && oldState != newState) { + if (oldState != newState) { // Notify when a node is entering maintenance, decommissioning or back to service if (newState == NodeOperationalState.ENTERING_MAINTENANCE || newState == NodeOperationalState.DECOMMISSIONING || newState == NodeOperationalState.IN_SERVICE) { LOG.info("Notifying ReplicationManager of node state change for {}: {} -> {}", datanodeDetails, oldState, newState); - replicationManager.notifyNodeStateChange(); + scmNodeEventPublisher.fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanodeDetails); + } } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index dc75e1864bbf..5ed28eaab3fb 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -1815,7 +1815,7 @@ public synchronized void processAll() { assertTrue(processAllCalled.get()); processAllCalled.set(false); - customRM.notifyNodeStateChange(); + assertThat(customRM.notifyNodeStateChange()).isEqualTo(queueIsEmpty); GenericTestUtils.waitFor( () -> customRM.isThreadWaiting(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index 153dc26fa580..a182ed69acf2 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -86,7 +86,7 @@ import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.net.NetworkTopology; @@ -2066,23 +2066,28 @@ private static Stream nodeStateTransitions() { @ParameterizedTest @MethodSource("nodeStateTransitions") - public void testNodeOperationalStateChangeNotifiesReplicationManager( + public void testNodeOperationalStateChange( HddsProtos.NodeOperationalState oldState, HddsProtos.NodeOperationalState newState, boolean shouldNotify) throws IOException, NodeNotFoundException, AuthenticationException { - OzoneConfiguration conf = getConf(); - ReplicationManager replicationManager = mock(ReplicationManager.class); - - try (SCMNodeManager nodeManager = createNodeManager(conf)) { - nodeManager.setReplicationManager(replicationManager); + OzoneConfiguration conf = new OzoneConfiguration(); + SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class); + when(scmStorageConfig.getClusterID()).thenReturn("xyz111"); + EventPublisher eventPublisher = mock(EventPublisher.class); + HDDSLayoutVersionManager lvm = + new HDDSLayoutVersionManager(scmStorageConfig.getLayoutVersion()); + createNodeManager(getConf()); + SCMNodeManager nodeManager = new SCMNodeManager(conf, + scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf), + scmContext, lvm); - DatanodeDetails datanode = HddsTestUtils.createRandomDatanodeAndRegister( - nodeManager); - nodeManager.getNodeStateManager().setNodeOperationalState(datanode, oldState); + DatanodeDetails datanode = MockDatanodeDetails.randomDatanodeDetails(); + datanode.setPersistedOpState(oldState); + nodeManager.register(datanode, null, HddsTestUtils.getRandomPipelineReports()); + nodeManager.setNodeOperationalState(datanode, newState, 0); + verify(eventPublisher, times(shouldNotify ? 1 : 0)).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode); - nodeManager.setNodeOperationalState(datanode, newState, 0); - verify(replicationManager, times(shouldNotify ? 1 : 0)).notifyNodeStateChange(); - } + nodeManager.close(); } } From f7363deb83b48b73214a32751d26df2931ad6e73 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sat, 8 Mar 2025 22:43:51 +0800 Subject: [PATCH 08/16] Some old code cleanup --- .../hadoop/hdds/scm/node/DeadNodeHandler.java | 15 ++------------- .../apache/hadoop/hdds/scm/node/NodeManager.java | 7 ------- .../hadoop/hdds/scm/node/SCMNodeManager.java | 7 ------- .../hdds/scm/server/StorageContainerManager.java | 3 +-- .../hdds/scm/container/MockNodeManager.java | 5 ----- .../hdds/scm/container/SimpleMockNodeManager.java | 5 ----- .../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 15 +++++---------- .../testutils/ReplicationNodeManagerMock.java | 5 ----- 8 files changed, 8 insertions(+), 54 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index 96176c302399..b5f45efb1b1b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -52,34 +52,23 @@ public class DeadNodeHandler implements EventHandler { private final ContainerManager containerManager; @Nullable private final DeletedBlockLog deletedBlockLog; - @Nullable - private final ReplicationManager replicationManager; private static final Logger LOG = LoggerFactory.getLogger(DeadNodeHandler.class); - public DeadNodeHandler(final NodeManager nodeManager, - final PipelineManager pipelineManager, - final ContainerManager containerManager) { - this(nodeManager, pipelineManager, containerManager, null, null); - } - public DeadNodeHandler(final NodeManager nodeManager, final PipelineManager pipelineManager, - final ContainerManager containerManager, - final ReplicationManager replicationManager) { - this(nodeManager, pipelineManager, containerManager, replicationManager, null); + final ContainerManager containerManager) { + this(nodeManager, pipelineManager, containerManager, null); } public DeadNodeHandler(final NodeManager nodeManager, final PipelineManager pipelineManager, final ContainerManager containerManager, - @Nullable final ReplicationManager replicationManager, @Nullable final DeletedBlockLog deletedBlockLog) { this.nodeManager = nodeManager; this.pipelineManager = pipelineManager; this.containerManager = containerManager; - this.replicationManager = replicationManager; this.deletedBlockLog = deletedBlockLog; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index b831c9e0361b..e6a74b395f77 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -75,12 +74,6 @@ public interface NodeManager extends StorageContainerNodeProtocol, EventHandler, NodeManagerMXBean, Closeable { - /** - * TODO: remove cyclic dependency between NodeManager and ReplicationManager. - * Set the ReplicationManager - * @param rm ReplicationManager instance - */ - void setReplicationManager(ReplicationManager rm); /** * Register API without a layout version info object passed in. Useful for diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 75f7aeb99017..551bde7376ed 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -69,7 +69,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.net.NetworkTopology; @@ -135,7 +134,6 @@ public class SCMNodeManager implements NodeManager { private final SCMContext scmContext; private final Map>> sendCommandNotifyMap; - private volatile ReplicationManager replicationManager; /** * Lock used to synchronize some operation in Node manager to ensure a @@ -297,11 +295,6 @@ public NodeStatus getNodeStatus(DatanodeDetails datanodeDetails) return nodeStateManager.getNodeStatus(datanodeDetails); } - @Override - public void setReplicationManager(ReplicationManager rm) { - replicationManager = rm; - } - /** * Set the operation state of a node. * @param datanodeDetails The datanode to set the new state for diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 094de80e6a3d..cd8da86f7e4c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -474,7 +474,7 @@ private void initializeEventHandlers() { StaleNodeHandler staleNodeHandler = new StaleNodeHandler(scmNodeManager, pipelineManager); DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager, - pipelineManager, containerManager, replicationManager); + pipelineManager, containerManager); StartDatanodeAdminHandler datanodeStartAdminHandler = new StartDatanodeAdminHandler(scmNodeManager, pipelineManager); ReadOnlyHealthyToHealthyNodeHandler readOnlyHealthyToHealthyNodeHandler = @@ -818,7 +818,6 @@ private void initializeSystemManagers(OzoneConfiguration conf, containerReplicaPendingOps); reconfigurationHandler.register(replicationManager.getConfig()); } - scmNodeManager.setReplicationManager(replicationManager); serviceManager.register(replicationManager); // RM gets notified of expired pending delete from containerReplicaPendingOps by subscribing to it // so it can resend them. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 648a86574ed7..244f86e79540 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.net.NetConstants; import org.apache.hadoop.hdds.scm.net.NetworkTopology; @@ -239,10 +238,6 @@ private void populateNodeMetric(DatanodeDetails datanodeDetails, int x) { } - @Override - public void setReplicationManager(ReplicationManager rm) { - } - /** * Gets all Live Datanodes that is currently communicating with SCM. * diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java index 234eb69dac71..ea1054784d09 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; @@ -95,10 +94,6 @@ public void setPipelines(DatanodeDetails dd, int count) { pipelineMap.put(dd.getUuid(), pipelines); } - @Override - public void setReplicationManager(ReplicationManager rm) { - } - /** * If the given node was registered with the nodeManager, return the * NodeStatus for the node. Otherwise return a NodeStatus of "In Service diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 822d13f61acd..b003b9784b2d 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.container.ContainerReplica; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; @@ -83,7 +82,6 @@ public class TestDeadNodeHandler { private ContainerManager containerManager; private PipelineManagerImpl pipelineManager; private DeadNodeHandler deadNodeHandler; - private ReplicationManager replicationManager; private HealthyReadOnlyNodeHandler healthyReadOnlyNodeHandler; private EventPublisher publisher; private EventQueue eventQueue; @@ -109,8 +107,6 @@ public void setup() throws IOException, AuthenticationException { .setSCM(scm).build(); pipelineManager = (PipelineManagerImpl)scm.getPipelineManager(); - replicationManager = scm.getReplicationManager(); - replicationManager.setScmContext(scmContext); pipelineManager.setScmContext(scmContext); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, @@ -118,10 +114,9 @@ public void setup() throws IOException, AuthenticationException { pipelineManager.setPipelineProvider(RATIS, mockRatisProvider); containerManager = scm.getContainerManager(); - replicationManager = mock(ReplicationManager.class); deletedBlockLog = mock(DeletedBlockLog.class); deadNodeHandler = new DeadNodeHandler(nodeManager, - mock(PipelineManager.class), containerManager, replicationManager, deletedBlockLog); + mock(PipelineManager.class), containerManager, deletedBlockLog); healthyReadOnlyNodeHandler = new HealthyReadOnlyNodeHandler(nodeManager, pipelineManager); @@ -236,8 +231,9 @@ public void testOnMessage(@TempDir File tempDir) throws Exception { assertFalse( nodeManager.getClusterNetworkTopologyMap().contains(datanode1)); - verify(replicationManager).notifyNodeStateChange(); - clearInvocations(replicationManager); + verify(publisher).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode1); + + clearInvocations(publisher); verify(deletedBlockLog, times(0)) .onDatanodeDead(datanode1.getUuid()); @@ -268,8 +264,7 @@ public void testOnMessage(@TempDir File tempDir) throws Exception { nodeManager.getClusterNetworkTopologyMap().contains(datanode1)); assertEquals(0, nodeManager.getCommandQueueCount(datanode1.getUuid(), cmd.getType())); - verify(replicationManager).notifyNodeStateChange(); - + verify(publisher).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode1); verify(deletedBlockLog).onDatanodeDead(datanode1.getUuid()); container1Replicas = containerManager diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 30ae71b7816c..6eb7d7c943f5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.node.CommandQueue; import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; @@ -70,10 +69,6 @@ public ReplicationNodeManagerMock(Map nodeStatus, this.commandQueue = commandQueue; } - @Override - public void setReplicationManager(ReplicationManager rm) { - } - /** * Get the number of data nodes that in all states. * From 00e526f0c5b8ab2829549c2c95327d42be940d45 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sat, 8 Mar 2025 23:19:21 +0800 Subject: [PATCH 09/16] Add ReplicationManagerEventHandler to SCM event queue --- .../hadoop/hdds/scm/server/StorageContainerManager.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index cd8da86f7e4c..103dc1ed747b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -95,6 +95,7 @@ import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; import org.apache.hadoop.hdds.scm.container.replication.DatanodeCommandCountUpdatedHandler; import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManagerEventHandler; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; @@ -495,11 +496,16 @@ private void initializeEventHandlers() { PipelineActionHandler pipelineActionHandler = new PipelineActionHandler(pipelineManager, scmContext); + ReplicationManagerEventHandler replicationManagerEventHandler = + new ReplicationManagerEventHandler(replicationManager); + eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler); eventQueue.addHandler(SCMEvents.DATANODE_COMMAND_COUNT_UPDATED, new DatanodeCommandCountUpdatedHandler(replicationManager)); + eventQueue.addHandler(SCMEvents.REPLICATION_MANAGER_NOTIFY, + replicationManagerEventHandler); // Use the same executor for both ICR and FCR. // The Executor maps the event to a thread for DN. From 6a53c3a7f74eac8489a7786186251f46ef0ddd0f Mon Sep 17 00:00:00 2001 From: peterxcli Date: Mon, 10 Mar 2025 17:27:31 +0800 Subject: [PATCH 10/16] NPE in TestDeadNodeHandler --- .../org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index b003b9784b2d..dadf486385e4 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -108,6 +108,7 @@ public void setup() throws IOException, AuthenticationException { pipelineManager = (PipelineManagerImpl)scm.getPipelineManager(); pipelineManager.setScmContext(scmContext); + scm.getReplicationManager().setScmContext(scmContext); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, pipelineManager.getStateManager(), conf); From 32705a2b536afe844709ceea7f09bc98f968c585 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 11 Mar 2025 18:18:59 +0800 Subject: [PATCH 11/16] Notify RM if persisted op state changed --- .../hadoop/hdds/scm/node/SCMNodeManager.java | 37 +++++++++++-------- .../hdds/scm/node/TestSCMNodeManager.java | 21 ++++++++--- 2 files changed, 37 insertions(+), 21 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 551bde7376ed..761c2f5e5d4e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -318,22 +318,8 @@ public void setNodeOperationalState(DatanodeDetails datanodeDetails, public void setNodeOperationalState(DatanodeDetails datanodeDetails, NodeOperationalState newState, long opStateExpiryEpocSec) throws NodeNotFoundException { - NodeOperationalState oldState = getNodeStatus(datanodeDetails).getOperationalState(); - nodeStateManager.setNodeOperationalState( datanodeDetails, newState, opStateExpiryEpocSec); - - if (oldState != newState) { - // Notify when a node is entering maintenance, decommissioning or back to service - if (newState == NodeOperationalState.ENTERING_MAINTENANCE - || newState == NodeOperationalState.DECOMMISSIONING - || newState == NodeOperationalState.IN_SERVICE) { - LOG.info("Notifying ReplicationManager of node state change for {}: {} -> {}", - datanodeDetails, oldState, newState); - scmNodeEventPublisher.fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanodeDetails); - - } - } } /** @@ -648,9 +634,30 @@ protected void updateDatanodeOpState(DatanodeDetails reportedDn) } } DatanodeDetails scmDnd = nodeStateManager.getNode(reportedDn); + NodeOperationalState oldPersistedOpState = scmDnd.getPersistedOpState(); + NodeOperationalState newPersistedOpState = reportedDn.getPersistedOpState(); + scmDnd.setPersistedOpStateExpiryEpochSec( reportedDn.getPersistedOpStateExpiryEpochSec()); - scmDnd.setPersistedOpState(reportedDn.getPersistedOpState()); + scmDnd.setPersistedOpState(newPersistedOpState); + + maybeNotifyReplicationManager(reportedDn, oldPersistedOpState, newPersistedOpState); + } + + private void maybeNotifyReplicationManager( + DatanodeDetails datanode, + NodeOperationalState oldState, + NodeOperationalState newState) { + if (oldState != newState) { + // Notify when a node is entering maintenance, decommissioning or back to service + if (newState == NodeOperationalState.ENTERING_MAINTENANCE + || newState == NodeOperationalState.DECOMMISSIONING + || newState == NodeOperationalState.IN_SERVICE) { + LOG.info("Notifying ReplicationManager of node state change for {}: {} -> {}", + datanode, oldState, newState); + scmNodeEventPublisher.fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode); + } + } } @Override diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index a182ed69acf2..d085d7a1bc2b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -2067,26 +2067,35 @@ private static Stream nodeStateTransitions() { @ParameterizedTest @MethodSource("nodeStateTransitions") public void testNodeOperationalStateChange( - HddsProtos.NodeOperationalState oldState, - HddsProtos.NodeOperationalState newState, + HddsProtos.NodeOperationalState oldState, + HddsProtos.NodeOperationalState newState, boolean shouldNotify) throws IOException, NodeNotFoundException, AuthenticationException { OzoneConfiguration conf = new OzoneConfiguration(); SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class); when(scmStorageConfig.getClusterID()).thenReturn("xyz111"); EventPublisher eventPublisher = mock(EventPublisher.class); - HDDSLayoutVersionManager lvm = - new HDDSLayoutVersionManager(scmStorageConfig.getLayoutVersion()); + HDDSLayoutVersionManager lvm = new HDDSLayoutVersionManager(scmStorageConfig.getLayoutVersion()); createNodeManager(getConf()); - SCMNodeManager nodeManager = new SCMNodeManager(conf, + SCMNodeManager nodeManager = new SCMNodeManager(conf, scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf), scmContext, lvm); DatanodeDetails datanode = MockDatanodeDetails.randomDatanodeDetails(); datanode.setPersistedOpState(oldState); nodeManager.register(datanode, null, HddsTestUtils.getRandomPipelineReports()); + nodeManager.setNodeOperationalState(datanode, newState, 0); - verify(eventPublisher, times(shouldNotify ? 1 : 0)).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode); + verify(eventPublisher, times(0)).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode); + + DatanodeDetails reportedDatanode = MockDatanodeDetails.createDatanodeDetails( + datanode.getUuid()); + reportedDatanode.setPersistedOpState(newState); + + nodeManager.processHeartbeat(reportedDatanode); + + verify(eventPublisher, times(shouldNotify ? 1 : 0)).fireEvent( + SCMEvents.REPLICATION_MANAGER_NOTIFY, reportedDatanode); nodeManager.close(); } From bf17079fe771afad1333451465996f37c70515e7 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 11 Mar 2025 18:20:00 +0800 Subject: [PATCH 12/16] Add integration test for RM on RM being notify when node status changed scenario --- .../TestReplicationManagerIntegration.java | 313 ++++++++++++++++++ 1 file changed, 313 insertions(+) create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java new file mode 100644 index 000000000000..9ef9693ea085 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.replication; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.node.TestNodeUtil.getDNHostAndPort; +import static org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachHealthState; +import static org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachOpState; +import static org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachPersistedOpState; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.stream.Stream; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.utils.IOUtils; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.MiniOzoneClusterProvider; +import org.apache.hadoop.ozone.OzoneTestUtils; +import org.apache.hadoop.ozone.TestDataUtil; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneKey; +import org.apache.hadoop.ozone.client.OzoneKeyDetails; +import org.apache.hadoop.ozone.client.OzoneKeyLocation; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for ReplicationManager. + */ +public class TestReplicationManagerIntegration { + private static final int DATANODE_COUNT = 5; + private static final int HEALTHY_REPLICA_NUM = 3; + private static String bucketName = "bucket1"; + private static String volName = "vol1"; + private static RatisReplicationConfig ratisRepConfig = RatisReplicationConfig + .getInstance(HddsProtos.ReplicationFactor.THREE); + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationManagerIntegration.class); + + private MiniOzoneCluster cluster; + private NodeManager nodeManager; + private ContainerManager containerManager; + private ReplicationManager replicationManager; + private StorageContainerManager scm; + private OzoneClient client; + private ContainerOperationClient scmClient; + private OzoneBucket bucket; + + private static MiniOzoneClusterProvider clusterProvider; + + @BeforeAll + public static void init() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, + 100, TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); + conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1); + conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); + conf.setTimeDuration(OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL, + 1, SECONDS); + conf.setTimeDuration( + ScmConfigKeys.OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL, + 1, SECONDS); + conf.setTimeDuration(HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, + 0, SECONDS); + conf.set(OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s"); + conf.set(OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s"); + conf.set(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s"); + + ReplicationManagerConfiguration replicationConf = conf.getObject(ReplicationManagerConfiguration.class); + replicationConf.setInterval(Duration.ofSeconds(1)); + replicationConf.setUnderReplicatedInterval(Duration.ofSeconds(1)); + replicationConf.setOverReplicatedInterval(Duration.ofSeconds(1)); + conf.setFromObject(replicationConf); + + MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(DATANODE_COUNT); + + clusterProvider = new MiniOzoneClusterProvider(builder, 9); + } + + @AfterAll + public static void shutdown() throws InterruptedException { + if (clusterProvider != null) { + clusterProvider.shutdown(); + } + } + + @BeforeEach + public void setUp() throws Exception { + cluster = clusterProvider.provide(); + cluster.getConf().setTimeDuration(HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, 0, SECONDS); + cluster.waitForClusterToBeReady(); + + scm = cluster.getStorageContainerManager(); + nodeManager = scm.getScmNodeManager(); + containerManager = scm.getContainerManager(); + replicationManager = scm.getReplicationManager(); + + client = cluster.newClient(); + scmClient = new ContainerOperationClient(cluster.getConf()); + bucket = TestDataUtil.createVolumeAndBucket(client, volName, bucketName); + } + + @AfterEach + public void tearDown() throws InterruptedException, IOException { + IOUtils.close(LOG, client, scmClient); + if (cluster != null) { + clusterProvider.destroy(cluster); + } + } + + @Test + public void testClosedContainerReplicationWhenNodeDies() + throws Exception { + // Test if RM notify works + replicationManager.getConfig().setInterval(Duration.ofSeconds(300)); + GenericTestUtils.waitFor(() -> { + return replicationManager.isThreadWaiting(); + }, 1000, 30000); + + String keyName = "key-" + UUID.randomUUID(); + TestDataUtil.createKey(bucket, keyName, ratisRepConfig, + "this is the content".getBytes(StandardCharsets.UTF_8)); + + // Get the container ID for the key + OzoneKey key = bucket.getKey(keyName); + OzoneKeyDetails keyDetails = (OzoneKeyDetails) key; + List keyLocations = keyDetails.getOzoneKeyLocations(); + long containerID = keyLocations.get(0).getContainerID(); + ContainerID containerId = ContainerID.valueOf(containerID); + // open container would not be handled to do any further processing in RM + OzoneTestUtils.closeContainer(scm, containerManager.getContainer(containerId)); + + assertEquals(HEALTHY_REPLICA_NUM, containerManager.getContainerReplicas(containerId).size()); + + // Find a datanode that has a replica of this container + final DatanodeDetails targetDatanode = containerManager.getContainerReplicas(containerId).stream().findFirst().get() + .getDatanodeDetails(); + + cluster.shutdownHddsDatanode(targetDatanode); + waitForDnToReachHealthState(nodeManager, targetDatanode, DEAD); + + // Check if the replicas nodes don't contain dead one + // and the replica of container replica num is considered to be healthy + GenericTestUtils.waitFor(() -> { + try { + Set replicas = containerManager.getContainerReplicas(containerId); + boolean deadNodeNotInContainerReplica = replicas.stream() + .noneMatch(r -> r.getDatanodeDetails().equals(targetDatanode)); + boolean hasHealthyReplicaNum = replicas.size() == HEALTHY_REPLICA_NUM; + return deadNodeNotInContainerReplica && hasHealthyReplicaNum; + } catch (ContainerNotFoundException e) { + return false; + } + }, 100, 30000); + } + + private static Stream decommissionTransitionTestCases() { + return Stream.of( + // Test case 1: Node enters maintenance + Arguments.of( + "Node enters maintenance", + (BiConsumer) (client, dn) -> { + try { + client.startMaintenanceNodes(Arrays.asList(getDNHostAndPort(dn)), 0, false); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, + IN_MAINTENANCE), + // Test case 2: Node is decommissioned + Arguments.of( + "Node is decommissioned", + (BiConsumer) (client, dn) -> { + try { + client.decommissionNodes(Arrays.asList(getDNHostAndPort(dn)), false); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, + DECOMMISSIONED)); + } + + @ParameterizedTest + @MethodSource("decommissionTransitionTestCases") + public void testClosedContainerReplicationWhenNodeDecommissionAndBackToInService( + String testName, + BiConsumer action, + NodeOperationalState expectedOpState) + throws Exception { + + // Test if RM notify works + replicationManager.getConfig().setInterval(Duration.ofSeconds(300)); + GenericTestUtils.waitFor(() -> { + return replicationManager.isThreadWaiting(); + }, 1000, 30000); + + String keyName = "key-" + UUID.randomUUID(); + TestDataUtil.createKey(bucket, keyName, ratisRepConfig, + "this is the content".getBytes(StandardCharsets.UTF_8)); + + OzoneKey key = bucket.getKey(keyName); + OzoneKeyDetails keyDetails = (OzoneKeyDetails) key; + List keyLocations = keyDetails.getOzoneKeyLocations(); + + long containerID = keyLocations.get(0).getContainerID(); + ContainerID containerId = ContainerID.valueOf(containerID); + ContainerInfo containerInfo = containerManager.getContainer(containerId); + OzoneTestUtils.closeContainer(scm, containerInfo); + + assertEquals(containerManager.getContainerReplicas(containerId).size(), HEALTHY_REPLICA_NUM); + + DatanodeDetails datanode = containerManager.getContainerReplicas(containerId).stream().findFirst().get() + .getDatanodeDetails(); + + action.accept(scmClient, datanode); + + waitForDnToReachOpState(nodeManager, datanode, expectedOpState); + + if (expectedOpState == DECOMMISSIONED) { + // decommissioning node would be excluded + assertEquals(containerManager.getContainerReplicas(containerId).size(), + HEALTHY_REPLICA_NUM + 1); + } else { + assertEquals(containerManager.getContainerReplicas(containerId).size(), + HEALTHY_REPLICA_NUM); + } + + // bring the node back to service + scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(datanode))); + + waitForDnToReachOpState(nodeManager, datanode, IN_SERVICE); + waitForDnToReachPersistedOpState(datanode, IN_SERVICE); + + GenericTestUtils.waitFor(() -> { + try { + return containerManager.getContainerReplicas(containerId).size() == HEALTHY_REPLICA_NUM; + } catch (Exception e) { + return false; + } + }, 1000, 30000); + } +} From 6092de1d69a2920b2a320da58ce17cac92819877 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Mon, 17 Mar 2025 00:36:35 +0800 Subject: [PATCH 13/16] Addressed comment: Only notify when leader ready and out of safemode --- .../ReplicationManagerEventHandler.java | 11 +++++-- .../hadoop/hdds/scm/node/SCMNodeManager.java | 4 +++ .../TestReplicationManagerEventHandler.java | 32 +++++++++++++++---- .../hdds/scm/node/TestSCMNodeManager.java | 16 +++++----- 4 files changed, 47 insertions(+), 16 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java index 06f01b4bc395..c63c44596d82 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.container.replication; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.slf4j.Logger; @@ -31,14 +32,20 @@ public class ReplicationManagerEventHandler implements EventHandler testData() { + return Stream.of( + Arguments.of(true, false, true), + Arguments.of(false, true, false), + Arguments.of(true, true, false), + Arguments.of(false, false, false) + ); } - @Test - public void testReplicationManagerEventHandler() { + @ParameterizedTest + @MethodSource("testData") + public void testReplicationManagerEventHandler(boolean isLeaderReady, boolean isInSafeMode, + boolean isExpectedToNotify) { + when(scmContext.isLeaderReady()).thenReturn(isLeaderReady); + when(scmContext.isInSafeMode()).thenReturn(isInSafeMode); DatanodeDetails dataNodeDetails = MockDatanodeDetails.randomDatanodeDetails(); replicationManagerEventHandler.onMessage(dataNodeDetails, publisher); - verify(replicationManager).notifyNodeStateChange(); + verify(replicationManager, times(isExpectedToNotify ? 1 : 0)).notifyNodeStateChange(); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index d085d7a1bc2b..8f7617a799c2 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -2043,23 +2043,23 @@ public void testScmRegisterNodeWithUpdatedIpAndHostname() private static Stream nodeStateTransitions() { return Stream.of( // start decommissioning or entering maintenance - Arguments.of(HddsProtos.NodeOperationalState.IN_SERVICE, + Arguments.of(HddsProtos.NodeOperationalState.IN_SERVICE, HddsProtos.NodeOperationalState.DECOMMISSIONING, true), - Arguments.of(HddsProtos.NodeOperationalState.IN_SERVICE, + Arguments.of(HddsProtos.NodeOperationalState.IN_SERVICE, HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, true), // back to service (DataNodeAdminMonitor abort workflow, maintenance end time expired or node is dead) - Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONING, + Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONING, HddsProtos.NodeOperationalState.IN_SERVICE, true), - Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONED, + Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONED, HddsProtos.NodeOperationalState.IN_SERVICE, true), - Arguments.of(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, + Arguments.of(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, HddsProtos.NodeOperationalState.IN_SERVICE, true), - Arguments.of(HddsProtos.NodeOperationalState.IN_MAINTENANCE, + Arguments.of(HddsProtos.NodeOperationalState.IN_MAINTENANCE, HddsProtos.NodeOperationalState.IN_SERVICE, true), // there is no under/over replicated containers on the node, completed the admin workflow - Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONING, + Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONING, HddsProtos.NodeOperationalState.DECOMMISSIONED, false), - Arguments.of(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, + Arguments.of(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, HddsProtos.NodeOperationalState.IN_MAINTENANCE, false) ); } From b15af64be8381feaa9f2de2f222d223b38ed122c Mon Sep 17 00:00:00 2001 From: peterxcli Date: Mon, 17 Mar 2025 00:37:54 +0800 Subject: [PATCH 14/16] Addressed comment: don't notify when RM is running --- .../scm/container/replication/ReplicationManager.java | 9 +++++++-- .../hadoop/hdds/scm/server/StorageContainerManager.java | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index 046f12dc6e31..c08e9a38f02f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -1492,16 +1492,21 @@ public synchronized boolean notifyNodeStateChange() { return false; } + if (!isThreadWaiting()) { + LOG.debug("Replication monitor is running, not need to wake it up"); + return false; + } + // Only wake up the thread if there's no active replication work // This prevents creating a new replication queue over and over // when multiple nodes change state in quick succession if (getQueue().isEmpty()) { - LOG.info("Waking up replication monitor due to node state change"); + LOG.debug("Waking up replication monitor due to node state change"); // Notify the replication monitor thread to wake up notify(); return true; } else { - LOG.info("Replication queue is not empty, not waking up replication monitor"); + LOG.debug("Replication queue is not empty, not waking up replication monitor"); return false; } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 103dc1ed747b..b6eb2b7e32b1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -497,7 +497,7 @@ private void initializeEventHandlers() { new PipelineActionHandler(pipelineManager, scmContext); ReplicationManagerEventHandler replicationManagerEventHandler = - new ReplicationManagerEventHandler(replicationManager); + new ReplicationManagerEventHandler(replicationManager, scmContext); eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager); From b2642bc5e3b4279396cb621568ea2fc0f8e28b4a Mon Sep 17 00:00:00 2001 From: peterxcli Date: Mon, 17 Mar 2025 00:39:09 +0800 Subject: [PATCH 15/16] Addressed comment: Only notify when node is not IN_MAINTENANCE status in DeadNodeHandler --- .../hadoop/hdds/scm/node/DeadNodeHandler.java | 17 ++++++++++------- .../hdds/scm/node/TestDeadNodeHandler.java | 2 +- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index b5f45efb1b1b..4f7a0e9382fc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -92,17 +92,21 @@ public void onMessage(final DatanodeDetails datanodeDetails, closeContainers(datanodeDetails, publisher); destroyPipelines(datanodeDetails); + boolean isNodeInMaintenance = nodeManager.getNodeStatus(datanodeDetails).isInMaintenance(); + // Remove the container replicas associated with the dead node unless it // is IN_MAINTENANCE - if (!nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) { + if (!isNodeInMaintenance) { removeContainerReplicas(datanodeDetails); } // Notify ReplicationManager - LOG.info("Notifying ReplicationManager about dead node: {}", - datanodeDetails); - publisher.fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanodeDetails); - + if (!isNodeInMaintenance) { + LOG.debug("Notifying ReplicationManager about dead node: {}", + datanodeDetails); + publisher.fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanodeDetails); + } + // remove commands in command queue for the DN final List cmdList = nodeManager.getCommandQueue( datanodeDetails.getUuid()); @@ -111,8 +115,7 @@ public void onMessage(final DatanodeDetails datanodeDetails, // remove DeleteBlocksCommand associated with the dead node unless it // is IN_MAINTENANCE - if (deletedBlockLog != null && - !nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) { + if (deletedBlockLog != null && !isNodeInMaintenance) { deletedBlockLog.onDatanodeDead(datanodeDetails.getUuid()); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index dadf486385e4..f588491e0680 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -232,7 +232,7 @@ public void testOnMessage(@TempDir File tempDir) throws Exception { assertFalse( nodeManager.getClusterNetworkTopologyMap().contains(datanode1)); - verify(publisher).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode1); + verify(publisher, times(0)).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode1); clearInvocations(publisher); From ffd034d7f581b37cfe3b22baa21645cfdc8139a0 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 9 Apr 2025 03:29:00 +0000 Subject: [PATCH 16/16] Reuse cluster in TestReplicationManagerIntegration Co-authored-by: Doroszlai, Attila <6454655+adoroszlai@users.noreply.github.com> --- .../TestReplicationManagerIntegration.java | 170 +++++++----------- 1 file changed, 60 insertions(+), 110 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java index 9ef9693ea085..8556a01e5333 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.container.replication; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; @@ -24,7 +25,6 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; @@ -41,16 +41,12 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION; import static org.junit.jupiter.api.Assertions.assertEquals; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; -import java.util.stream.Stream; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -68,35 +64,34 @@ import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.MiniOzoneClusterProvider; import org.apache.hadoop.ozone.OzoneTestUtils; import org.apache.hadoop.ozone.TestDataUtil; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; -import org.apache.hadoop.ozone.client.OzoneKey; import org.apache.hadoop.ozone.client.OzoneKeyDetails; import org.apache.hadoop.ozone.client.OzoneKeyLocation; import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.EnumSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Integration test for ReplicationManager. */ -public class TestReplicationManagerIntegration { +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class TestReplicationManagerIntegration { private static final int DATANODE_COUNT = 5; private static final int HEALTHY_REPLICA_NUM = 3; - private static String bucketName = "bucket1"; - private static String volName = "vol1"; - private static RatisReplicationConfig ratisRepConfig = RatisReplicationConfig + private static final RatisReplicationConfig RATIS_REPLICATION_CONFIG = RatisReplicationConfig .getInstance(HddsProtos.ReplicationFactor.THREE); private static final Logger LOG = LoggerFactory.getLogger(TestReplicationManagerIntegration.class); @@ -109,22 +104,20 @@ public class TestReplicationManagerIntegration { private ContainerOperationClient scmClient; private OzoneBucket bucket; - private static MiniOzoneClusterProvider clusterProvider; - @BeforeAll - public static void init() throws Exception { + void init() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, - 100, TimeUnit.MILLISECONDS); - conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); + 100, MILLISECONDS); + conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 100, MILLISECONDS); conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1); - conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 1, SECONDS); - conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 1, SECONDS); - conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, SECONDS); - conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS); - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); - conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); + conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 100, MILLISECONDS); + conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 100, MILLISECONDS); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 100, MILLISECONDS); + conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 100, MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 1, SECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 2, SECONDS); conf.setTimeDuration(OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL, 1, SECONDS); conf.setTimeDuration( @@ -138,26 +131,14 @@ public static void init() throws Exception { ReplicationManagerConfiguration replicationConf = conf.getObject(ReplicationManagerConfiguration.class); replicationConf.setInterval(Duration.ofSeconds(1)); - replicationConf.setUnderReplicatedInterval(Duration.ofSeconds(1)); - replicationConf.setOverReplicatedInterval(Duration.ofSeconds(1)); + replicationConf.setUnderReplicatedInterval(Duration.ofMillis(100)); + replicationConf.setOverReplicatedInterval(Duration.ofMillis(100)); conf.setFromObject(replicationConf); MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(DATANODE_COUNT); - clusterProvider = new MiniOzoneClusterProvider(builder, 9); - } - - @AfterAll - public static void shutdown() throws InterruptedException { - if (clusterProvider != null) { - clusterProvider.shutdown(); - } - } - - @BeforeEach - public void setUp() throws Exception { - cluster = clusterProvider.provide(); + cluster = builder.build(); cluster.getConf().setTimeDuration(HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, 0, SECONDS); cluster.waitForClusterToBeReady(); @@ -168,33 +149,32 @@ public void setUp() throws Exception { client = cluster.newClient(); scmClient = new ContainerOperationClient(cluster.getConf()); - bucket = TestDataUtil.createVolumeAndBucket(client, volName, bucketName); + bucket = TestDataUtil.createVolumeAndBucket(client); } - @AfterEach - public void tearDown() throws InterruptedException, IOException { - IOUtils.close(LOG, client, scmClient); - if (cluster != null) { - clusterProvider.destroy(cluster); - } + @AfterAll + void shutdown() { + IOUtils.close(LOG, client, scmClient, cluster); } + @Order(1) @Test - public void testClosedContainerReplicationWhenNodeDies() - throws Exception { + void testReplicationManagerNotify() throws Exception { // Test if RM notify works replicationManager.getConfig().setInterval(Duration.ofSeconds(300)); - GenericTestUtils.waitFor(() -> { - return replicationManager.isThreadWaiting(); - }, 1000, 30000); + GenericTestUtils.waitFor(() -> replicationManager.isThreadWaiting(), 200, 30000); + } + @Order(Integer.MAX_VALUE) + @Test + public void testClosedContainerReplicationWhenNodeDies() + throws Exception { String keyName = "key-" + UUID.randomUUID(); - TestDataUtil.createKey(bucket, keyName, ratisRepConfig, + TestDataUtil.createKey(bucket, keyName, RATIS_REPLICATION_CONFIG, "this is the content".getBytes(StandardCharsets.UTF_8)); // Get the container ID for the key - OzoneKey key = bucket.getKey(keyName); - OzoneKeyDetails keyDetails = (OzoneKeyDetails) key; + OzoneKeyDetails keyDetails = bucket.getKey(keyName); List keyLocations = keyDetails.getOzoneKeyLocations(); long containerID = keyLocations.get(0).getContainerID(); ContainerID containerId = ContainerID.valueOf(containerID); @@ -203,9 +183,7 @@ public void testClosedContainerReplicationWhenNodeDies() assertEquals(HEALTHY_REPLICA_NUM, containerManager.getContainerReplicas(containerId).size()); - // Find a datanode that has a replica of this container - final DatanodeDetails targetDatanode = containerManager.getContainerReplicas(containerId).stream().findFirst().get() - .getDatanodeDetails(); + final DatanodeDetails targetDatanode = findReplica(containerId); cluster.shutdownHddsDatanode(targetDatanode); waitForDnToReachHealthState(nodeManager, targetDatanode, DEAD); @@ -225,53 +203,26 @@ public void testClosedContainerReplicationWhenNodeDies() }, 100, 30000); } - private static Stream decommissionTransitionTestCases() { - return Stream.of( - // Test case 1: Node enters maintenance - Arguments.of( - "Node enters maintenance", - (BiConsumer) (client, dn) -> { - try { - client.startMaintenanceNodes(Arrays.asList(getDNHostAndPort(dn)), 0, false); - } catch (IOException e) { - throw new RuntimeException(e); - } - }, - IN_MAINTENANCE), - // Test case 2: Node is decommissioned - Arguments.of( - "Node is decommissioned", - (BiConsumer) (client, dn) -> { - try { - client.decommissionNodes(Arrays.asList(getDNHostAndPort(dn)), false); - } catch (IOException e) { - throw new RuntimeException(e); - } - }, - DECOMMISSIONED)); + private DatanodeDetails findReplica(ContainerID containerId) throws ContainerNotFoundException { + // Find a datanode that has a replica of this container + return containerManager.getContainerReplicas(containerId).stream() + .findFirst() + .orElseThrow(() -> new AssertionError("Replica not found for " + containerId)) + .getDatanodeDetails(); } @ParameterizedTest - @MethodSource("decommissionTransitionTestCases") - public void testClosedContainerReplicationWhenNodeDecommissionAndBackToInService( - String testName, - BiConsumer action, + @EnumSource(value = NodeOperationalState.class, names = {"IN_MAINTENANCE", "DECOMMISSIONED"}) + void testClosedContainerReplicationWhenNodeDecommissionAndBackToInService( NodeOperationalState expectedOpState) throws Exception { - // Test if RM notify works - replicationManager.getConfig().setInterval(Duration.ofSeconds(300)); - GenericTestUtils.waitFor(() -> { - return replicationManager.isThreadWaiting(); - }, 1000, 30000); - String keyName = "key-" + UUID.randomUUID(); - TestDataUtil.createKey(bucket, keyName, ratisRepConfig, + TestDataUtil.createKey(bucket, keyName, RATIS_REPLICATION_CONFIG, "this is the content".getBytes(StandardCharsets.UTF_8)); - OzoneKey key = bucket.getKey(keyName); - OzoneKeyDetails keyDetails = (OzoneKeyDetails) key; - List keyLocations = keyDetails.getOzoneKeyLocations(); + OzoneKeyDetails key = bucket.getKey(keyName); + List keyLocations = key.getOzoneKeyLocations(); long containerID = keyLocations.get(0).getContainerID(); ContainerID containerId = ContainerID.valueOf(containerID); @@ -280,24 +231,23 @@ public void testClosedContainerReplicationWhenNodeDecommissionAndBackToInService assertEquals(containerManager.getContainerReplicas(containerId).size(), HEALTHY_REPLICA_NUM); - DatanodeDetails datanode = containerManager.getContainerReplicas(containerId).stream().findFirst().get() - .getDatanodeDetails(); + DatanodeDetails datanode = findReplica(containerId); - action.accept(scmClient, datanode); - - waitForDnToReachOpState(nodeManager, datanode, expectedOpState); - - if (expectedOpState == DECOMMISSIONED) { - // decommissioning node would be excluded + if (expectedOpState == IN_MAINTENANCE) { + scmClient.startMaintenanceNodes(Collections.singletonList(getDNHostAndPort(datanode)), 0, false); + waitForDnToReachOpState(nodeManager, datanode, expectedOpState); assertEquals(containerManager.getContainerReplicas(containerId).size(), - HEALTHY_REPLICA_NUM + 1); + HEALTHY_REPLICA_NUM); } else { + scmClient.decommissionNodes(Collections.singletonList(getDNHostAndPort(datanode)), false); + waitForDnToReachOpState(nodeManager, datanode, expectedOpState); + // decommissioning node would be excluded assertEquals(containerManager.getContainerReplicas(containerId).size(), - HEALTHY_REPLICA_NUM); + HEALTHY_REPLICA_NUM + 1); } // bring the node back to service - scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(datanode))); + scmClient.recommissionNodes(Collections.singletonList(getDNHostAndPort(datanode))); waitForDnToReachOpState(nodeManager, datanode, IN_SERVICE); waitForDnToReachPersistedOpState(datanode, IN_SERVICE); @@ -308,6 +258,6 @@ public void testClosedContainerReplicationWhenNodeDecommissionAndBackToInService } catch (Exception e) { return false; } - }, 1000, 30000); + }, 200, 30000); } }