From 06df0b7afed97cca8a919f9c7cbc995818c10e69 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Thu, 8 Dec 2022 16:43:40 +0000 Subject: [PATCH 01/11] Allow deadline to be passed to the DN for any DN command --- .../endpoint/HeartbeatEndpointTask.java | 4 ++ .../ozone/protocol/commands/SCMCommand.java | 38 +++++++++++++++++++ .../ScmServerDatanodeHeartbeatProtocol.proto | 1 + .../scm/server/SCMDatanodeProtocolServer.java | 3 ++ 4 files changed, 46 insertions(+) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index ccb0e8b7d7d9..a694ba00becd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -426,6 +426,7 @@ private void processResponse(SCMHeartbeatResponseProto response, * Common processing for SCM commands. * - set term * - set encoded token + * - any deadline which is relevant to the command * - add to context's queue */ private void processCommonCommand( @@ -436,6 +437,9 @@ private void processCommonCommand( if (response.hasEncodedToken()) { cmd.setEncodedToken(response.getEncodedToken()); } + if (response.hasDeadlineMsSinceEpoch()) { + cmd.setDeadline(response.getDeadlineMsSinceEpoch()); + } context.addCommand(cmd); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java index 744118e3013f..40c91f43e986 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java @@ -38,6 +38,8 @@ public abstract class SCMCommand implements private String encodedToken = ""; + private long deadlineMsSinceEpoch = 0; + SCMCommand() { this.id = HddsIdFactory.getLongId(); } @@ -88,4 +90,40 @@ public String getEncodedToken() { public void setEncodedToken(String encodedToken) { this.encodedToken = encodedToken; } + + /** + * Allows a deadline to be set on the command. The deadline is set as the + * milliseconds since the epoch when the command must have been completed by. + * It is up to the code processing the command to enforce the deadline by + * calling the hasExpired() method, and the code sending the command to set + * the deadline. The default deadline is zero, which means no deadline. + * @param deadlineMs The ms since epoch when the command must have completed + * by. + */ + public void setDeadline(long deadlineMs) { + this.deadlineMsSinceEpoch = deadlineMs; + } + + /** + * @return The deadline set for this command, or zero if no command has been + * set. + */ + public long getDeadline() { + return deadlineMsSinceEpoch; + } + + /** + * If a deadline has been set to a non zero value, test if the current time + * passed is beyond the deadline or not. + * @param currentEpochMs current time in milliseconds since the epoch. + * @return false if there is no deadline, or it has not expired. True if the + * set deadline has expired. + */ + public boolean hasExpired(long currentEpochMs) { + if (deadlineMsSinceEpoch > 0 && + currentEpochMs > deadlineMsSinceEpoch) { + return true; + } + return false; + } } diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto index 6465eeb40b47..7f8476d99500 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto @@ -343,6 +343,7 @@ message SCMCommandProto { // SCM is a leader. If running without Ratis, holds SCMContext.INVALID_TERM. optional int64 term = 15; optional string encodedToken = 16; + optional int64 deadlineMsSinceEpoch = 17; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index f2654fac027b..233f86d15531 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -313,6 +313,9 @@ public SCMCommandProto getCommandResponse(SCMCommand cmd) // In HA mode, it is the term of current leader SCM. // In non-HA mode, it is the default value 0. builder.setTerm(cmd.getTerm()); + // The default deadline is 0, which means no deadline. Individual commands + // may have a deadline set. + builder.setDeadlineMsSinceEpoch(cmd.getDeadline()); switch (cmd.getType()) { case reregisterCommand: From 7d068fb83c30bf0a344248e44dc4dab4e1dee392 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Thu, 8 Dec 2022 17:16:55 +0000 Subject: [PATCH 02/11] Reduced ReplicationSupervistor constructors and injected a clock object to use for time and in tests --- .../statemachine/DatanodeStateMachine.java | 6 ++++- .../replication/ReplicationSupervisor.java | 26 +++++++------------ .../ReplicationSupervisorScheduling.java | 10 +++++-- .../TestReplicationSupervisor.java | 10 +++++-- .../freon/ClosedContainerReplicator.java | 9 ++++++- 5 files changed, 39 insertions(+), 22 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 4d72bb317f3e..c06daa2c99ee 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -18,6 +18,8 @@ import java.io.Closeable; import java.io.IOException; +import java.time.Clock; +import java.time.ZoneId; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -40,6 +42,7 @@ import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager; import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; import org.apache.hadoop.ozone.HddsDatanodeStopService; +import org.apache.hadoop.ozone.common.MonotonicClock; import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage; import org.apache.hadoop.ozone.container.common.report.ReportManager; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler; @@ -139,6 +142,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, this.conf = conf; this.datanodeDetails = datanodeDetails; + Clock clock = new MonotonicClock(ZoneId.systemDefault()); // Expected to be initialized already. layoutStorage = new DatanodeLayoutStorage(conf, datanodeDetails.getUuidString()); @@ -180,7 +184,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, conf.getObject(ReplicationConfig.class); supervisor = new ReplicationSupervisor(container.getContainerSet(), context, - replicatorMetrics, replicationConfig); + replicatorMetrics, replicationConfig, clock); replicationSupervisorMetrics = ReplicationSupervisorMetrics.create(supervisor); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 5432656e0363..03800c27bef9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone.container.replication; +import java.time.Clock; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap.KeySetView; import java.util.concurrent.ExecutorService; @@ -49,6 +50,7 @@ public class ReplicationSupervisor { private final ContainerReplicator replicator; private final ExecutorService executor; private final StateContext context; + private final Clock clock; private final AtomicLong requestCounter = new AtomicLong(); private final AtomicLong successCounter = new AtomicLong(); @@ -64,35 +66,27 @@ public class ReplicationSupervisor { @VisibleForTesting ReplicationSupervisor( ContainerSet containerSet, StateContext context, - ContainerReplicator replicator, ExecutorService executor) { + ContainerReplicator replicator, ExecutorService executor, + Clock clock) { this.containerSet = containerSet; this.replicator = replicator; this.containersInFlight = ConcurrentHashMap.newKeySet(); this.executor = executor; this.context = context; + this.clock = clock; } public ReplicationSupervisor( ContainerSet containerSet, StateContext context, - ContainerReplicator replicator, ReplicationConfig replicationConfig) { - this(containerSet, context, replicator, - replicationConfig.getReplicationMaxStreams()); - } - - public ReplicationSupervisor( - ContainerSet containerSet, StateContext context, - ContainerReplicator replicator, int poolSize) { + ContainerReplicator replicator, ReplicationConfig replicationConfig, + Clock clock) { this(containerSet, context, replicator, new ThreadPoolExecutor( - poolSize, poolSize, 60, TimeUnit.SECONDS, + replicationConfig.getReplicationMaxStreams(), + replicationConfig.getReplicationMaxStreams(), 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("ContainerReplicationThread-%d") - .build())); - } - - public ReplicationSupervisor(ContainerSet containerSet, - ContainerReplicator replicator, int poolSize) { - this(containerSet, null, replicator, poolSize); + .build()), clock); } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java index 457f0a58ae46..1ec6fecd40e8 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone.container.replication; +import java.time.ZoneId; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -24,8 +25,10 @@ import java.util.Random; import java.util.UUID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.ozone.common.MonotonicClock; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.junit.jupiter.api.Assertions; @@ -43,6 +46,9 @@ public class ReplicationSupervisorScheduling { @Test public void test() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + ReplicationServer.ReplicationConfig replicationConfig + = conf.getObject(ReplicationServer.ReplicationConfig.class); List datanodes = new ArrayList<>(); datanodes.add(MockDatanodeDetails.randomDatanodeDetails()); datanodes.add(MockDatanodeDetails.randomDatanodeDetails()); @@ -69,7 +75,7 @@ public void test() throws InterruptedException { ContainerSet cs = new ContainerSet(1000); - ReplicationSupervisor rs = new ReplicationSupervisor(cs, + ReplicationSupervisor rs = new ReplicationSupervisor(cs, null, //simplified executor emulating the current sequential download + //import. @@ -107,7 +113,7 @@ public void test() throws InterruptedException { } } - }, 10); + }, replicationConfig, new MonotonicClock(ZoneId.systemDefault())); final long start = System.currentTimeMillis(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 51f78440bb4c..aa104dad2a91 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -20,6 +20,9 @@ import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; import java.util.List; import java.util.UUID; import java.util.concurrent.AbstractExecutorService; @@ -39,6 +42,7 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.ozone.test.GenericTestUtils; +import org.apache.ozone.test.TestClock; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -76,6 +80,7 @@ public class TestReplicationSupervisor { private ContainerSet set; private final ContainerLayoutVersion layout; + private Clock clock = new TestClock(Instant.now(), ZoneId.systemDefault()); public TestReplicationSupervisor(ContainerLayoutVersion layout) { this.layout = layout; @@ -231,7 +236,7 @@ public void slowDownload() { public void testDownloadAndImportReplicatorFailure() { ReplicationSupervisor supervisor = new ReplicationSupervisor(set, null, mutableReplicator, - newDirectExecutorService()); + newDirectExecutorService(), clock); // Mock to fetch an exception in the importContainer method. SimpleContainerDownloader moc = @@ -265,7 +270,8 @@ private ReplicationSupervisor supervisorWith( Function replicatorFactory, ExecutorService executor) { ReplicationSupervisor supervisor = - new ReplicationSupervisor(set, null, mutableReplicator, executor); + new ReplicationSupervisor(set, null, mutableReplicator, executor, + clock); replicatorRef.set(replicatorFactory.apply(supervisor)); return supervisor; } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java index a7d332e7839a..059d74a10592 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.ozone.common.MonotonicClock; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Handler; @@ -37,6 +38,7 @@ import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.replication.ContainerReplicator; import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator; +import org.apache.hadoop.ozone.container.replication.ReplicationServer; import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; import org.apache.hadoop.ozone.container.replication.ReplicationTask; import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader; @@ -48,6 +50,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -203,7 +206,11 @@ private void initializeReplicationSupervisor(ConfigurationSource conf) new SimpleContainerDownloader(conf, null), new TarContainerPacker()); - supervisor = new ReplicationSupervisor(containerSet, replicator, 10); + ReplicationServer.ReplicationConfig replicationConfig + = conf.getObject(ReplicationServer.ReplicationConfig.class); + supervisor = new ReplicationSupervisor(containerSet, null, + replicator, replicationConfig, + new MonotonicClock(ZoneId.systemDefault())); } private void replicateContainer(long counter) throws Exception { From f650f30c96196421fd78a6bad4a75e6a5270b32e Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Fri, 9 Dec 2022 12:32:38 +0000 Subject: [PATCH 03/11] Use deadline in ReplicateContainerCommand --- .../ReplicateContainerCommandHandler.java | 5 ++- .../replication/ReplicationSupervisor.java | 14 ++++++++ .../ReplicationSupervisorMetrics.java | 5 ++- .../replication/ReplicationTask.java | 19 +++++++++++ .../TestReplicationSupervisor.java | 32 +++++++++++++++++-- 5 files changed, 71 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java index 44c783846ad1..a3ebac49a1c5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -66,12 +66,15 @@ public void handle(SCMCommand command, OzoneContainer container, final List sourceDatanodes = replicateCommand.getSourceDatanodes(); final long containerID = replicateCommand.getContainerID(); + final long deadline = replicateCommand.getDeadline(); Preconditions.checkArgument(sourceDatanodes.size() > 0, "Replication command is received for container %s " + "without source datanodes.", containerID); - supervisor.addTask(new ReplicationTask(containerID, sourceDatanodes)); + ReplicationTask task = new ReplicationTask(containerID, sourceDatanodes); + task.setDeadline(deadline); + supervisor.addTask(task); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 03800c27bef9..444ab303f518 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -55,6 +55,7 @@ public class ReplicationSupervisor { private final AtomicLong requestCounter = new AtomicLong(); private final AtomicLong successCounter = new AtomicLong(); private final AtomicLong failureCounter = new AtomicLong(); + private final AtomicLong timeoutCounter = new AtomicLong(); /** * A set of container IDs that are currently being downloaded @@ -142,6 +143,14 @@ public void run() { try { requestCounter.incrementAndGet(); + if (task.getDeadline() > 0 && clock.millis() > task.getDeadline()) { + LOG.info("Ignoring this replicate container command for container" + + " {} since the current time {}ms is past the deadline {}ms", + containerId, clock.millis(), task.getDeadline()); + timeoutCounter.incrementAndGet(); + return; + } + if (context != null) { DatanodeDetails dn = context.getParent().getDatanodeDetails(); if (dn.getPersistedOpState() != @@ -200,4 +209,9 @@ public long getReplicationSuccessCount() { public long getReplicationFailureCount() { return failureCounter.get(); } + + public long getReplicationTimeoutCount() { + return timeoutCounter.get(); + } + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java index df48abda4f3c..9c5eba40858a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java @@ -66,6 +66,9 @@ public void getMetrics(MetricsCollector collector, boolean all) { supervisor.getQueueSize()) .addGauge(Interns.info("numRequestedReplications", "Number of requested replications"), - supervisor.getReplicationRequestCount()); + supervisor.getReplicationRequestCount()) + .addGauge(Interns.info("numTimeoutReplications", + "Number of timeout replications in queue"), + supervisor.getReplicationTimeoutCount()); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java index e6e0d0526b5b..f89522582016 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java @@ -36,6 +36,8 @@ public class ReplicationTask { private final Instant queued = Instant.now(); + private long deadlineMsSinceEpoch = 0; + /** * Counter for the transferred bytes. */ @@ -49,6 +51,23 @@ public ReplicationTask( this.sources = sources; } + /** + * Set the time, in milliseconds since the epoch when this task should have + * completed by, otherwise it should be dropped. + * @param msSinceEpoch The task deadline in milliseconds since the epoch. + */ + public void setDeadline(long msSinceEpoch) { + this.deadlineMsSinceEpoch = msSinceEpoch; + } + + /** + * Returns any deadline set on this task, in milliseconds since the epoch. + * A returned value of zero indicates no deadline. + */ + public long getDeadline() { + return deadlineMsSinceEpoch; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index aa104dad2a91..cc8d8cf73ba5 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -20,7 +20,6 @@ import java.nio.file.Path; import java.nio.file.Paths; -import java.time.Clock; import java.time.Instant; import java.time.ZoneId; import java.util.List; @@ -80,7 +79,7 @@ public class TestReplicationSupervisor { private ContainerSet set; private final ContainerLayoutVersion layout; - private Clock clock = new TestClock(Instant.now(), ZoneId.systemDefault()); + private TestClock clock; public TestReplicationSupervisor(ContainerLayoutVersion layout) { this.layout = layout; @@ -93,6 +92,7 @@ public static Iterable parameters() { @Before public void setUp() throws Exception { + clock = new TestClock(Instant.now(), ZoneId.systemDefault()); set = new ContainerSet(1000); } @@ -261,6 +261,34 @@ public void testDownloadAndImportReplicatorFailure() { .contains("Container 1 replication was unsuccessful.")); } + @Test + public void testTaskBeyondDeadline() { + ReplicationSupervisor supervisor = + supervisorWithReplicator(FakeReplicator::new); + + ReplicationTask task1 = new ReplicationTask(1L, emptyList()); + task1.setDeadline(clock.millis() + 10000); + ReplicationTask task2 = new ReplicationTask(2L, emptyList()); + task2.setDeadline(clock.millis() + 20000); + ReplicationTask task3 = new ReplicationTask(3L, emptyList()); + // no deadline set + + clock.fastForward(15000); + + supervisor.addTask(task1); + supervisor.addTask(task2); + supervisor.addTask(task3); + + Assert.assertEquals(3, supervisor.getReplicationRequestCount()); + Assert.assertEquals(2, supervisor.getReplicationSuccessCount()); + Assert.assertEquals(0, supervisor.getReplicationFailureCount()); + Assert.assertEquals(0, supervisor.getInFlightReplications()); + Assert.assertEquals(0, supervisor.getQueueSize()); + Assert.assertEquals(1, supervisor.getReplicationTimeoutCount()); + Assert.assertEquals(2, set.containerCount()); + + } + private ReplicationSupervisor supervisorWithReplicator( Function replicatorFactory) { return supervisorWith(replicatorFactory, newDirectExecutorService()); From e9079889a621c858f587cbc64d31d57c4ed286e8 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Fri, 9 Dec 2022 15:34:09 +0000 Subject: [PATCH 04/11] Add deadline to the delete container handler --- .../statemachine/DatanodeStateMachine.java | 2 +- .../DeleteContainerCommandHandler.java | 26 ++++++- .../TestDeleteContainerCommandHandler.java | 71 +++++++++++++++++++ 3 files changed, 95 insertions(+), 4 deletions(-) create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index c06daa2c99ee..d68dfd969122 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -211,7 +211,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, .addHandler(new ReconstructECContainersCommandHandler(conf, ecReconstructionSupervisor)) .addHandler(new DeleteContainerCommandHandler( - dnConf.getContainerDeleteThreads())) + dnConf.getContainerDeleteThreads(), clock)) .addHandler(new ClosePipelineCommandHandler()) .addHandler(new CreatePipelineCommandHandler(conf)) .addHandler(new SetNodeOperationalStateCommandHandler(conf)) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java index 58ad2d18e4ff..b209e6ee3412 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Clock; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; @@ -48,15 +49,22 @@ public class DeleteContainerCommandHandler implements CommandHandler { LoggerFactory.getLogger(DeleteContainerCommandHandler.class); private final AtomicInteger invocationCount = new AtomicInteger(0); + private final AtomicInteger timeoutCount = new AtomicInteger(0); private final AtomicLong totalTime = new AtomicLong(0); private final ExecutorService executor; + private final Clock clock; - public DeleteContainerCommandHandler(int threadPoolSize) { - this.executor = Executors.newFixedThreadPool( + public DeleteContainerCommandHandler(int threadPoolSize, Clock clock) { + this(clock, Executors.newFixedThreadPool( threadPoolSize, new ThreadFactoryBuilder() - .setNameFormat("DeleteContainerThread-%d").build()); + .setNameFormat("DeleteContainerThread-%d").build())); } + protected DeleteContainerCommandHandler(Clock clock, + ExecutorService executor) { + this.executor = executor; + this.clock = clock; + } @Override public void handle(final SCMCommand command, final OzoneContainer ozoneContainer, @@ -69,6 +77,14 @@ public void handle(final SCMCommand command, final long startTime = Time.monotonicNow(); invocationCount.incrementAndGet(); try { + if (command.hasExpired(clock.millis())) { + LOG.info("Not processing the delete container command for " + + "container {} as the current time {}ms is after the command " + + "deadline {}ms", deleteContainerCommand.getContainerID(), + clock.millis(), command.getDeadline()); + timeoutCount.incrementAndGet(); + return; + } controller.deleteContainer(deleteContainerCommand.getContainerID(), deleteContainerCommand.isForce()); } catch (IOException e) { @@ -94,6 +110,10 @@ public int getInvocationCount() { return this.invocationCount.get(); } + public int getTimeoutCount() { + return this.timeoutCount.get(); + } + @Override public long getAverageRunTime() { final int invocations = invocationCount.get(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java new file mode 100644 index 000000000000..282e00f734a5 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; +import org.apache.ozone.test.TestClock; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.mockito.Mockito; + +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; + +import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; +import static org.mockito.Mockito.times; + +/** + * Test for the DeleteContainerCommandHandler. + */ +public class TestDeleteContainerCommandHandler { + + @Test + public void testExpiredCommandsAreNotProcessed() throws IOException { + TestClock clock = new TestClock(Instant.now(), ZoneId.systemDefault()); + DeleteContainerCommandHandler handler = + new DeleteContainerCommandHandler(clock, newDirectExecutorService()); + OzoneContainer ozoneContainer = Mockito.mock(OzoneContainer.class); + ContainerController controller = Mockito.mock(ContainerController.class); + Mockito.when(ozoneContainer.getController()).thenReturn(controller); + + DeleteContainerCommand command1 = new DeleteContainerCommand(1L); + command1.setDeadline(clock.millis() + 10000); + DeleteContainerCommand command2 = new DeleteContainerCommand(2L); + command2.setDeadline(clock.millis() + 20000); + DeleteContainerCommand command3 = new DeleteContainerCommand(3L); + // No deadline on the 3rd command + + clock.fastForward(15000); + handler.handle(command1, ozoneContainer, null, null); + Assertions.assertEquals(1, handler.getTimeoutCount()); + handler.handle(command2, ozoneContainer, null, null); + handler.handle(command3, ozoneContainer, null, null); + Assertions.assertEquals(1, handler.getTimeoutCount()); + Assertions.assertEquals(3, handler.getInvocationCount()); + Mockito.verify(controller, times(0)) + .deleteContainer(1L, false); + Mockito.verify(controller, times(1)) + .deleteContainer(2L, false); + Mockito.verify(controller, times(1)) + .deleteContainer(3L, false); + } + +} From 187dc8b334f3a08d07ec3f90d641744db7dcc77d Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Mon, 12 Dec 2022 11:09:37 +0000 Subject: [PATCH 05/11] Add deadline to the EC Reconstruction command handler --- .../statemachine/DatanodeStateMachine.java | 2 +- ...ReconstructECContainersCommandHandler.java | 3 +- .../ECReconstructionCommandInfo.java | 8 ++- .../ECReconstructionCoordinatorTask.java | 20 ++++++- .../ECReconstructionSupervisor.java | 11 ++-- .../TestECReconstructionSupervisor.java | 57 ++++++++++++++++++- 6 files changed, 89 insertions(+), 12 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index d68dfd969122..107dba186cae 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -197,7 +197,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, ecReconstructionSupervisor = new ECReconstructionSupervisor(container.getContainerSet(), context, replicationConfig.getReplicationMaxStreams(), - ecReconstructionCoordinator); + ecReconstructionCoordinator, clock); // When we add new handlers just adding a new handler here should do the diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java index 57d4d16f8ab1..fe89ccd2b09a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java @@ -51,7 +51,8 @@ public void handle(SCMCommand command, OzoneContainer container, ecContainersCommand.getEcReplicationConfig(), ecContainersCommand.getMissingContainerIndexes(), ecContainersCommand.getSources(), - ecContainersCommand.getTargetDatanodes()); + ecContainersCommand.getTargetDatanodes(), + ecContainersCommand.getDeadline()); this.supervisor.addTask(reconstructionCommandInfo); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java index c95f9646f859..7bc18ed49f9d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java @@ -35,17 +35,23 @@ public class ECReconstructionCommandInfo { private List sources; private List targetDatanodes; + private long deadlineMsSinceEpoch = 0; public ECReconstructionCommandInfo(long containerID, ECReplicationConfig ecReplicationConfig, byte[] missingContainerIndexes, List sources, - List targetDatanodes) { + List targetDatanodes, long deadlineMsSinceEpoch) { this.containerID = containerID; this.ecReplicationConfig = ecReplicationConfig; this.missingContainerIndexes = Arrays.copyOf(missingContainerIndexes, missingContainerIndexes.length); this.sources = sources; this.targetDatanodes = targetDatanodes; + this.deadlineMsSinceEpoch = deadlineMsSinceEpoch; + } + + public long getDeadline() { + return deadlineMsSinceEpoch; } public long getContainerID() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java index e0aa14419a4f..4e46860c889e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Clock; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -36,17 +37,21 @@ public class ECReconstructionCoordinatorTask implements Runnable { static final Logger LOG = LoggerFactory.getLogger(ECReconstructionCoordinatorTask.class); private final ConcurrentHashMap.KeySetView inprogressCounter; - private ECReconstructionCoordinator reconstructionCoordinator; - private ECReconstructionCommandInfo reconstructionCommandInfo; + private final ECReconstructionCoordinator reconstructionCoordinator; + private final ECReconstructionCommandInfo reconstructionCommandInfo; + private long deadlineMsSinceEpoch = 0; + private final Clock clock; public ECReconstructionCoordinatorTask( ECReconstructionCoordinator coordinator, ECReconstructionCommandInfo reconstructionCommandInfo, ConcurrentHashMap.KeySetView - inprogressReconstructionCoordinatorCounter) { + inprogressReconstructionCoordinatorCounter, + Clock clock) { this.reconstructionCoordinator = coordinator; this.reconstructionCommandInfo = reconstructionCommandInfo; this.inprogressCounter = inprogressReconstructionCoordinatorCounter; + this.clock = clock; } @Override @@ -69,6 +74,15 @@ public void run() { containerID); } try { + if (reconstructionCommandInfo.getDeadline() > 0 + && clock.millis() > reconstructionCommandInfo.getDeadline()) { + LOG.info("Ignoring this reconstruct container command for container" + + " {} since the current time {}ms is past the deadline {}ms", + containerID, clock.millis(), + reconstructionCommandInfo.getDeadline()); + return; + } + SortedMap sourceNodeMap = reconstructionCommandInfo.getSources().stream().collect(Collectors .toMap(DatanodeDetailsAndReplicaIndex::getReplicaIndex, diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java index e2de7ac6959e..e36636d69356 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java @@ -23,6 +23,7 @@ import java.io.Closeable; import java.io.IOException; +import java.time.Clock; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -39,6 +40,7 @@ public class ECReconstructionSupervisor implements Closeable { private final StateContext context; private final ExecutorService executor; private final ECReconstructionCoordinator reconstructionCoordinator; + private final Clock clock; /** * how many coordinator tasks currently being running. */ @@ -47,18 +49,19 @@ public class ECReconstructionSupervisor implements Closeable { public ECReconstructionSupervisor(ContainerSet containerSet, StateContext context, ExecutorService executor, - ECReconstructionCoordinator coordinator) { + ECReconstructionCoordinator coordinator, Clock clock) { this.containerSet = containerSet; this.context = context; this.executor = executor; this.reconstructionCoordinator = coordinator; this.inProgressReconstrucionCoordinatorCounter = ConcurrentHashMap.newKeySet(); + this.clock = clock; } public ECReconstructionSupervisor(ContainerSet containerSet, StateContext context, int poolSize, - ECReconstructionCoordinator coordinator) { + ECReconstructionCoordinator coordinator, Clock clock) { // TODO: ReplicationSupervisor and this class can be refactored to have a // common interface. this(containerSet, context, @@ -66,7 +69,7 @@ public ECReconstructionSupervisor(ContainerSet containerSet, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("ECContainerReconstructionThread-%d").build()), - coordinator); + coordinator, clock); } public void stop() { @@ -86,7 +89,7 @@ public void addTask(ECReconstructionCommandInfo taskInfo) { .add(taskInfo.getContainerID())) { executor.execute( new ECReconstructionCoordinatorTask(getReconstructionCoordinator(), - taskInfo, inProgressReconstrucionCoordinatorCounter)); + taskInfo, inProgressReconstrucionCoordinatorCounter, clock)); } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java index c40ceb2ea36c..006573b23216 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java @@ -22,19 +22,37 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.ozone.test.GenericTestUtils; +import org.apache.ozone.test.TestClock; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; import java.util.SortedMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; +import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; + /** * Tests the ECReconstructionSupervisor. */ public class TestECReconstructionSupervisor { + private TestClock clock; + + @BeforeEach + public void setup() { + clock = new TestClock(Instant.now(), ZoneId.systemDefault()); + } + + @Test public void testAddTaskShouldExecuteTheGivenTask() throws InterruptedException, TimeoutException, IOException { @@ -58,15 +76,50 @@ public void reconstructECContainerGroup(long containerID, super.reconstructECContainerGroup(containerID, repConfig, sourceNodeMap, targetNodeMap); } - }) { + }, clock) { }; supervisor.addTask( new ECReconstructionCommandInfo(1, new ECReplicationConfig(3, 2), - new byte[0], ImmutableList.of(), ImmutableList.of())); + new byte[0], ImmutableList.of(), ImmutableList.of(), 0)); runnableInvoked.await(); Assertions.assertEquals(1, supervisor.getInFlightReplications()); holdProcessing.countDown(); GenericTestUtils .waitFor(() -> supervisor.getInFlightReplications() == 0, 100, 15000); } + + @Test + public void testTasksWithDeadlineExceededAreNotRun() throws IOException { + ECReconstructionCoordinator coordinator = + Mockito.mock(ECReconstructionCoordinator.class); + ECReconstructionSupervisor supervisor = + new ECReconstructionSupervisor(null, null, + newDirectExecutorService(), coordinator, clock); + + ECReconstructionCommandInfo task1 = new ECReconstructionCommandInfo(1L, + new ECReplicationConfig(3, 2), new byte[0], ImmutableList.of(), + ImmutableList.of(), 0); + ECReconstructionCommandInfo task2 = new ECReconstructionCommandInfo(2L, + new ECReplicationConfig(3, 2), new byte[0], ImmutableList.of(), + ImmutableList.of(), clock.millis() + 10000); + ECReconstructionCommandInfo task3 = new ECReconstructionCommandInfo(3L, + new ECReplicationConfig(3, 2), new byte[0], ImmutableList.of(), + ImmutableList.of(), clock.millis() + 20000); + + clock.fastForward(15000); + supervisor.addTask(task1); + supervisor.addTask(task2); + supervisor.addTask(task3); + + // No deadline for container 1, it should run. + Mockito.verify(coordinator, times(1)) + .reconstructECContainerGroup(eq(1L), any(), any(), any()); + // Deadline passed for container 2, it should not run. + Mockito.verify(coordinator, times(0)) + .reconstructECContainerGroup(eq(2L), any(), any(), any()); + // Deadline not passed for container 3, it should run. + Mockito.verify(coordinator, times(1)) + .reconstructECContainerGroup(eq(3L), any(), any(), any()); + } + } From ae1dcb90b74ef97f7c1af37f6602d02e97cba0fb Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Mon, 12 Dec 2022 12:48:29 +0000 Subject: [PATCH 06/11] Set deadline on commands sent from ReplicationManager --- .../replication/ReplicationManager.java | 30 +++++++++++++++++++ .../replication/TestReplicationManager.java | 9 ++++++ 2 files changed, 39 insertions(+) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index f78536912ebc..86a4d5ca0673 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -417,6 +417,8 @@ public void sendDatanodeCommand(SCMCommand command, LOG.info("Sending command of type {} for container {} to {}", command.getType(), containerInfo, target); command.setTerm(getScmTerm()); + command.setDeadline(clock.millis() + + Math.round(rmConf.eventTimeout * rmConf.commandDeadlineFactor)); final CommandForDatanode datanodeCommand = new CommandForDatanode<>(target.getUuid(), command); eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand); @@ -765,6 +767,34 @@ public void setEventTimeout(Duration timeout) { this.eventTimeout = timeout.toMillis(); } + /** + * Deadline which should be set on commands sent from ReplicationManager + * to the datanodes, as a percentage of the event.timeout. If the command + * has not been processed on the datanode by this time, it will be dropped + * by the datanode and Replication Manager will need to resend it. + */ + @Config(key = "command.deadline.factor", + type = ConfigType.DOUBLE, + defaultValue = "0.9", + tags = {SCM, OZONE}, + description = "Fraction of the hdds.scm.replication.event.timeout " + + "from the current time which should be set as a deadline for " + + "commands sent from ReplicationManager to datanodes. " + + "Commands which are not processed before this deadline will be " + + "dropped by the datanodes. Should be a value > 0 and <= 1.") + private double commandDeadlineFactor = 0.9; + public double getCommandDeadlineFactor() { + return commandDeadlineFactor; + } + + public void setCommandDeadlineFactor(double val) { + if (!(val > 0) || (val > 1)) { + throw new IllegalArgumentException(val + + " must be greater than 0 and less than equal to 1"); + } + commandDeadlineFactor = val; + } + /** * The number of container replica which must be available for a node to * enter maintenance. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index 82f971be799e..92ec4995706f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -477,6 +477,15 @@ public void testSendDatanodeReplicateCommand() throws NotLeaderException { replicationManager.sendDatanodeCommand(command, containerInfo, target); + // Ensure that the command deadline is set to current time + // + evenTime * factor + ReplicationManager.ReplicationManagerConfiguration rmConf = configuration + .getObject(ReplicationManager.ReplicationManagerConfiguration.class); + long expectedDeadline = clock.millis() + + Math.round(rmConf.getEventTimeout() * + rmConf.getCommandDeadlineFactor()); + Assert.assertEquals(expectedDeadline, command.getDeadline()); + List ops = containerReplicaPendingOps.getPendingOps( containerInfo.containerID()); Mockito.verify(eventPublisher).fireEvent(any(), any()); From 8f1c8aa4e628e863ed6014e437074ba8e9ee9c1d Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Mon, 12 Dec 2022 13:08:01 +0000 Subject: [PATCH 07/11] Add test to ensure term and deadline are set in datanode proto server --- .../scm/server/SCMDatanodeProtocolServer.java | 6 +-- .../scm/TestSCMDatanodeProtocolServer.java | 54 +++++++++++++++++++ 2 files changed, 57 insertions(+), 3 deletions(-) create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 233f86d15531..467adbe23a4d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -272,7 +272,7 @@ public SCMHeartbeatResponseProto sendHeartbeat( SCMHeartbeatRequestProto heartbeat) throws IOException, TimeoutException { List cmdResponses = new ArrayList<>(); for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) { - cmdResponses.add(getCommandResponse(cmd)); + cmdResponses.add(getCommandResponse(cmd, scm)); } boolean auditSuccess = true; Map auditMap = Maps.newHashMap(); @@ -305,8 +305,8 @@ public SCMHeartbeatResponseProto sendHeartbeat( * @throws IOException */ @VisibleForTesting - public SCMCommandProto getCommandResponse(SCMCommand cmd) - throws IOException, TimeoutException { + public static SCMCommandProto getCommandResponse(SCMCommand cmd, + OzoneStorageContainerManager scm) throws IOException, TimeoutException { SCMCommandProto.Builder builder = SCMCommandProto.newBuilder() .setEncodedToken(cmd.getEncodedToken()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java new file mode 100644 index 000000000000..cfa34fc445a0 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.junit.Assert; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.TimeoutException; + +/** + * Test for StorageContainerDatanodeProtocolProtos. + */ +public class TestSCMDatanodeProtocolServer { + + @Test + public void ensureTermAndDeadlineOnCommands() + throws IOException, TimeoutException { + OzoneStorageContainerManager scm = + Mockito.mock(OzoneStorageContainerManager.class); + + ReplicateContainerCommand command = new ReplicateContainerCommand(1L, + Collections.emptyList()); + command.setTerm(5L); + command.setDeadline(1234L); + StorageContainerDatanodeProtocolProtos.SCMCommandProto proto = + SCMDatanodeProtocolServer.getCommandResponse(command, scm); + + Assert.assertEquals(StorageContainerDatanodeProtocolProtos.SCMCommandProto + .Type.replicateContainerCommand, proto.getCommandType()); + Assert.assertEquals(5L, proto.getTerm()); + Assert.assertEquals(1234L, proto.getDeadlineMsSinceEpoch()); + } +} From 844865cfd5cdafee05298c0a52300d2eb4a43401 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Mon, 12 Dec 2022 17:52:07 +0000 Subject: [PATCH 08/11] Make ReplicationTask take ReplicateContainerCommand in its constructor --- .../ReplicateContainerCommandHandler.java | 3 +-- .../replication/ReplicationTask.java | 24 +++++++++---------- .../TestReplicationSupervisor.java | 16 +++++++++---- .../freon/ClosedContainerReplicator.java | 6 +++-- 4 files changed, 28 insertions(+), 21 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java index a3ebac49a1c5..10057ebb722f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -72,8 +72,7 @@ public void handle(SCMCommand command, OzoneContainer container, "Replication command is received for container %s " + "without source datanodes.", containerID); - ReplicationTask task = new ReplicationTask(containerID, sourceDatanodes); - task.setDeadline(deadline); + ReplicationTask task = new ReplicationTask(replicateCommand); supervisor.addTask(task); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java index f89522582016..b194c2e2077a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java @@ -22,6 +22,7 @@ import java.util.Objects; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; /** * The task to download a container from the sources. @@ -32,7 +33,7 @@ public class ReplicationTask { private final long containerId; - private List sources; + private final List sources; private final Instant queued = Instant.now(); @@ -43,21 +44,20 @@ public class ReplicationTask { */ private long transferredBytes; - public ReplicationTask( - long containerId, - List sources - ) { - this.containerId = containerId; - this.sources = sources; + public ReplicationTask(ReplicateContainerCommand cmd) { + this.containerId = cmd.getContainerID(); + this.sources = cmd.getSourceDatanodes(); + this.deadlineMsSinceEpoch = cmd.getDeadline(); } /** - * Set the time, in milliseconds since the epoch when this task should have - * completed by, otherwise it should be dropped. - * @param msSinceEpoch The task deadline in milliseconds since the epoch. + * Intended to only be used in tests. */ - public void setDeadline(long msSinceEpoch) { - this.deadlineMsSinceEpoch = msSinceEpoch; + protected ReplicationTask( + long containerId, + List sources + ) { + this(new ReplicateContainerCommand(containerId, sources)); } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index cc8d8cf73ba5..6ca50854783a 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -40,6 +40,7 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.TestClock; import org.junit.After; @@ -266,11 +267,16 @@ public void testTaskBeyondDeadline() { ReplicationSupervisor supervisor = supervisorWithReplicator(FakeReplicator::new); - ReplicationTask task1 = new ReplicationTask(1L, emptyList()); - task1.setDeadline(clock.millis() + 10000); - ReplicationTask task2 = new ReplicationTask(2L, emptyList()); - task2.setDeadline(clock.millis() + 20000); - ReplicationTask task3 = new ReplicationTask(3L, emptyList()); + ReplicateContainerCommand cmd = new ReplicateContainerCommand(1L, + emptyList()); + cmd.setDeadline(clock.millis() + 10000); + ReplicationTask task1 = new ReplicationTask(cmd); + cmd = new ReplicateContainerCommand(2L, emptyList()); + cmd.setDeadline(clock.millis() + 20000); + ReplicationTask task2 = new ReplicationTask(cmd); + cmd = new ReplicateContainerCommand(3L, emptyList()); + // No deadline set + ReplicationTask task3 = new ReplicationTask(cmd); // no deadline set clock.fastForward(15000); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java index 059d74a10592..ebfa2d863765 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java @@ -42,6 +42,7 @@ import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; import org.apache.hadoop.ozone.container.replication.ReplicationTask; import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.jetbrains.annotations.NotNull; import picocli.CommandLine.Command; import picocli.CommandLine.Option; @@ -126,8 +127,9 @@ public Void call() throws Exception { //if datanode is specified, replicate only container if it has a //replica. if (datanode.isEmpty() || datanodeUUIDs.contains(datanode)) { - replicationTasks.add(new ReplicationTask(container.getContainerID(), - datanodesWithContainer)); + replicationTasks.add(new ReplicationTask( + new ReplicateContainerCommand(container.getContainerID(), + datanodesWithContainer))); } } From 1ea551594974c2ccd286af04d5f33638be4c025c Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Mon, 12 Dec 2022 17:59:42 +0000 Subject: [PATCH 09/11] General review comments --- .../replication/ReplicationSupervisorMetrics.java | 2 +- .../hadoop/ozone/protocol/commands/SCMCommand.java | 7 ++----- .../container/replication/ReplicationManager.java | 14 ++++++++++---- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java index 9c5eba40858a..0576308bd2e0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java @@ -68,7 +68,7 @@ public void getMetrics(MetricsCollector collector, boolean all) { "Number of requested replications"), supervisor.getReplicationRequestCount()) .addGauge(Interns.info("numTimeoutReplications", - "Number of timeout replications in queue"), + "Number of replication requests timed out before being processed"), supervisor.getReplicationTimeoutCount()); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java index 40c91f43e986..ab214ef2f670 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java @@ -120,10 +120,7 @@ public long getDeadline() { * set deadline has expired. */ public boolean hasExpired(long currentEpochMs) { - if (deadlineMsSinceEpoch > 0 && - currentEpochMs > deadlineMsSinceEpoch) { - return true; - } - return false; + return deadlineMsSinceEpoch > 0 && + currentEpochMs > deadlineMsSinceEpoch; } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index 86a4d5ca0673..261803a3babf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigType; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.PostConstruct; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; @@ -788,10 +789,6 @@ public double getCommandDeadlineFactor() { } public void setCommandDeadlineFactor(double val) { - if (!(val > 0) || (val > 1)) { - throw new IllegalArgumentException(val - + " must be greater than 0 and less than equal to 1"); - } commandDeadlineFactor = val; } @@ -841,6 +838,15 @@ public void setMaintenanceReplicaMinimum(int replicaCount) { ) private int maintenanceRemainingRedundancy = 1; + @PostConstruct + public void validate() { + if (!(commandDeadlineFactor > 0) || (commandDeadlineFactor > 1)) { + throw new IllegalArgumentException("command.deadline.factor is set to " + + commandDeadlineFactor + + " and must be greater than 0 and less than equal to 1"); + } + } + public void setMaintenanceRemainingRedundancy(int redundancy) { this.maintenanceRemainingRedundancy = redundancy; } From 862130df0365c4e9da1e38393487f45578a61834 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Mon, 12 Dec 2022 18:27:48 +0000 Subject: [PATCH 10/11] Make ECReconstructionCommandInfo take a command object rather than the parameters of the command --- ...ReconstructECContainersCommandHandler.java | 7 +--- .../ECReconstructionCommandInfo.java | 18 +++++----- .../TestECReconstructionSupervisor.java | 36 ++++++++++++------- 3 files changed, 33 insertions(+), 28 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java index fe89ccd2b09a..c6abfc27c33a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java @@ -47,12 +47,7 @@ public void handle(SCMCommand command, OzoneContainer container, ReconstructECContainersCommand ecContainersCommand = (ReconstructECContainersCommand) command; ECReconstructionCommandInfo reconstructionCommandInfo = - new ECReconstructionCommandInfo(ecContainersCommand.getContainerID(), - ecContainersCommand.getEcReplicationConfig(), - ecContainersCommand.getMissingContainerIndexes(), - ecContainersCommand.getSources(), - ecContainersCommand.getTargetDatanodes(), - ecContainersCommand.getDeadline()); + new ECReconstructionCommandInfo(ecContainersCommand); this.supervisor.addTask(reconstructionCommandInfo); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java index 7bc18ed49f9d..8a4d26b55ca1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java @@ -37,17 +37,15 @@ public class ECReconstructionCommandInfo { private List targetDatanodes; private long deadlineMsSinceEpoch = 0; - public ECReconstructionCommandInfo(long containerID, - ECReplicationConfig ecReplicationConfig, byte[] missingContainerIndexes, - List sources, - List targetDatanodes, long deadlineMsSinceEpoch) { - this.containerID = containerID; - this.ecReplicationConfig = ecReplicationConfig; + public ECReconstructionCommandInfo(ReconstructECContainersCommand cmd) { + this.containerID = cmd.getContainerID(); + this.ecReplicationConfig = cmd.getEcReplicationConfig(); this.missingContainerIndexes = - Arrays.copyOf(missingContainerIndexes, missingContainerIndexes.length); - this.sources = sources; - this.targetDatanodes = targetDatanodes; - this.deadlineMsSinceEpoch = deadlineMsSinceEpoch; + Arrays.copyOf(cmd.getMissingContainerIndexes(), + cmd.getMissingContainerIndexes().length); + this.sources = cmd.getSources(); + this.targetDatanodes = cmd.getTargetDatanodes(); + this.deadlineMsSinceEpoch = cmd.getDeadline(); } public long getDeadline() { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java index 006573b23216..f0b6ff3c368a 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.TestClock; import org.junit.jupiter.api.Assertions; @@ -78,9 +79,10 @@ public void reconstructECContainerGroup(long containerID, } }, clock) { }; - supervisor.addTask( - new ECReconstructionCommandInfo(1, new ECReplicationConfig(3, 2), - new byte[0], ImmutableList.of(), ImmutableList.of(), 0)); + ReconstructECContainersCommand command = new ReconstructECContainersCommand( + 1L, ImmutableList.of(), ImmutableList.of(), new byte[0], + new ECReplicationConfig(3, 2)); + supervisor.addTask(new ECReconstructionCommandInfo(command)); runnableInvoked.await(); Assertions.assertEquals(1, supervisor.getInFlightReplications()); holdProcessing.countDown(); @@ -96,15 +98,25 @@ public void testTasksWithDeadlineExceededAreNotRun() throws IOException { new ECReconstructionSupervisor(null, null, newDirectExecutorService(), coordinator, clock); - ECReconstructionCommandInfo task1 = new ECReconstructionCommandInfo(1L, - new ECReplicationConfig(3, 2), new byte[0], ImmutableList.of(), - ImmutableList.of(), 0); - ECReconstructionCommandInfo task2 = new ECReconstructionCommandInfo(2L, - new ECReplicationConfig(3, 2), new byte[0], ImmutableList.of(), - ImmutableList.of(), clock.millis() + 10000); - ECReconstructionCommandInfo task3 = new ECReconstructionCommandInfo(3L, - new ECReplicationConfig(3, 2), new byte[0], ImmutableList.of(), - ImmutableList.of(), clock.millis() + 20000); + ReconstructECContainersCommand command = new ReconstructECContainersCommand( + 1L, ImmutableList.of(), ImmutableList.of(), new byte[0], + new ECReplicationConfig(3, 2)); + ECReconstructionCommandInfo task1 = + new ECReconstructionCommandInfo(command); + + command = new ReconstructECContainersCommand( + 2L, ImmutableList.of(), ImmutableList.of(), new byte[0], + new ECReplicationConfig(3, 2)); + command.setDeadline(clock.millis() + 10000); + ECReconstructionCommandInfo task2 = + new ECReconstructionCommandInfo(command); + + command = new ReconstructECContainersCommand( + 3L, ImmutableList.of(), ImmutableList.of(), new byte[0], + new ECReplicationConfig(3, 2)); + command.setDeadline(clock.millis() + 20000); + ECReconstructionCommandInfo task3 = + new ECReconstructionCommandInfo(command); clock.fastForward(15000); supervisor.addTask(task1); From 2af6bb983ba814f0d5aa8cf63197bf2555587e52 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Mon, 12 Dec 2022 20:01:15 +0000 Subject: [PATCH 11/11] Fix findbugs dead store --- .../commandhandler/ReplicateContainerCommandHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java index 10057ebb722f..df589e287d87 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -66,7 +66,6 @@ public void handle(SCMCommand command, OzoneContainer container, final List sourceDatanodes = replicateCommand.getSourceDatanodes(); final long containerID = replicateCommand.getContainerID(); - final long deadline = replicateCommand.getDeadline(); Preconditions.checkArgument(sourceDatanodes.size() > 0, "Replication command is received for container %s "