diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 4d72bb317f3e..107dba186cae 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -18,6 +18,8 @@ import java.io.Closeable; import java.io.IOException; +import java.time.Clock; +import java.time.ZoneId; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -40,6 +42,7 @@ import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager; import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; import org.apache.hadoop.ozone.HddsDatanodeStopService; +import org.apache.hadoop.ozone.common.MonotonicClock; import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage; import org.apache.hadoop.ozone.container.common.report.ReportManager; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler; @@ -139,6 +142,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, this.conf = conf; this.datanodeDetails = datanodeDetails; + Clock clock = new MonotonicClock(ZoneId.systemDefault()); // Expected to be initialized already. layoutStorage = new DatanodeLayoutStorage(conf, datanodeDetails.getUuidString()); @@ -180,7 +184,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, conf.getObject(ReplicationConfig.class); supervisor = new ReplicationSupervisor(container.getContainerSet(), context, - replicatorMetrics, replicationConfig); + replicatorMetrics, replicationConfig, clock); replicationSupervisorMetrics = ReplicationSupervisorMetrics.create(supervisor); @@ -193,7 +197,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, ecReconstructionSupervisor = new ECReconstructionSupervisor(container.getContainerSet(), context, replicationConfig.getReplicationMaxStreams(), - ecReconstructionCoordinator); + ecReconstructionCoordinator, clock); // When we add new handlers just adding a new handler here should do the @@ -207,7 +211,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, .addHandler(new ReconstructECContainersCommandHandler(conf, ecReconstructionSupervisor)) .addHandler(new DeleteContainerCommandHandler( - dnConf.getContainerDeleteThreads())) + dnConf.getContainerDeleteThreads(), clock)) .addHandler(new ClosePipelineCommandHandler()) .addHandler(new CreatePipelineCommandHandler(conf)) .addHandler(new SetNodeOperationalStateCommandHandler(conf)) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java index 58ad2d18e4ff..b209e6ee3412 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Clock; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; @@ -48,15 +49,22 @@ public class DeleteContainerCommandHandler implements CommandHandler { LoggerFactory.getLogger(DeleteContainerCommandHandler.class); private final AtomicInteger invocationCount = new AtomicInteger(0); + private final AtomicInteger timeoutCount = new AtomicInteger(0); private final AtomicLong totalTime = new AtomicLong(0); private final ExecutorService executor; + private final Clock clock; - public DeleteContainerCommandHandler(int threadPoolSize) { - this.executor = Executors.newFixedThreadPool( + public DeleteContainerCommandHandler(int threadPoolSize, Clock clock) { + this(clock, Executors.newFixedThreadPool( threadPoolSize, new ThreadFactoryBuilder() - .setNameFormat("DeleteContainerThread-%d").build()); + .setNameFormat("DeleteContainerThread-%d").build())); } + protected DeleteContainerCommandHandler(Clock clock, + ExecutorService executor) { + this.executor = executor; + this.clock = clock; + } @Override public void handle(final SCMCommand command, final OzoneContainer ozoneContainer, @@ -69,6 +77,14 @@ public void handle(final SCMCommand command, final long startTime = Time.monotonicNow(); invocationCount.incrementAndGet(); try { + if (command.hasExpired(clock.millis())) { + LOG.info("Not processing the delete container command for " + + "container {} as the current time {}ms is after the command " + + "deadline {}ms", deleteContainerCommand.getContainerID(), + clock.millis(), command.getDeadline()); + timeoutCount.incrementAndGet(); + return; + } controller.deleteContainer(deleteContainerCommand.getContainerID(), deleteContainerCommand.isForce()); } catch (IOException e) { @@ -94,6 +110,10 @@ public int getInvocationCount() { return this.invocationCount.get(); } + public int getTimeoutCount() { + return this.timeoutCount.get(); + } + @Override public long getAverageRunTime() { final int invocations = invocationCount.get(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java index 57d4d16f8ab1..c6abfc27c33a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java @@ -47,11 +47,7 @@ public void handle(SCMCommand command, OzoneContainer container, ReconstructECContainersCommand ecContainersCommand = (ReconstructECContainersCommand) command; ECReconstructionCommandInfo reconstructionCommandInfo = - new ECReconstructionCommandInfo(ecContainersCommand.getContainerID(), - ecContainersCommand.getEcReplicationConfig(), - ecContainersCommand.getMissingContainerIndexes(), - ecContainersCommand.getSources(), - ecContainersCommand.getTargetDatanodes()); + new ECReconstructionCommandInfo(ecContainersCommand); this.supervisor.addTask(reconstructionCommandInfo); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java index 44c783846ad1..df589e287d87 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -71,7 +71,8 @@ public void handle(SCMCommand command, OzoneContainer container, "Replication command is received for container %s " + "without source datanodes.", containerID); - supervisor.addTask(new ReplicationTask(containerID, sourceDatanodes)); + ReplicationTask task = new ReplicationTask(replicateCommand); + supervisor.addTask(task); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index ccb0e8b7d7d9..a694ba00becd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -426,6 +426,7 @@ private void processResponse(SCMHeartbeatResponseProto response, * Common processing for SCM commands. * - set term * - set encoded token + * - any deadline which is relevant to the command * - add to context's queue */ private void processCommonCommand( @@ -436,6 +437,9 @@ private void processCommonCommand( if (response.hasEncodedToken()) { cmd.setEncodedToken(response.getEncodedToken()); } + if (response.hasDeadlineMsSinceEpoch()) { + cmd.setDeadline(response.getDeadlineMsSinceEpoch()); + } context.addCommand(cmd); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java index c95f9646f859..8a4d26b55ca1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java @@ -35,17 +35,21 @@ public class ECReconstructionCommandInfo { private List sources; private List targetDatanodes; + private long deadlineMsSinceEpoch = 0; - public ECReconstructionCommandInfo(long containerID, - ECReplicationConfig ecReplicationConfig, byte[] missingContainerIndexes, - List sources, - List targetDatanodes) { - this.containerID = containerID; - this.ecReplicationConfig = ecReplicationConfig; + public ECReconstructionCommandInfo(ReconstructECContainersCommand cmd) { + this.containerID = cmd.getContainerID(); + this.ecReplicationConfig = cmd.getEcReplicationConfig(); this.missingContainerIndexes = - Arrays.copyOf(missingContainerIndexes, missingContainerIndexes.length); - this.sources = sources; - this.targetDatanodes = targetDatanodes; + Arrays.copyOf(cmd.getMissingContainerIndexes(), + cmd.getMissingContainerIndexes().length); + this.sources = cmd.getSources(); + this.targetDatanodes = cmd.getTargetDatanodes(); + this.deadlineMsSinceEpoch = cmd.getDeadline(); + } + + public long getDeadline() { + return deadlineMsSinceEpoch; } public long getContainerID() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java index e0aa14419a4f..4e46860c889e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Clock; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -36,17 +37,21 @@ public class ECReconstructionCoordinatorTask implements Runnable { static final Logger LOG = LoggerFactory.getLogger(ECReconstructionCoordinatorTask.class); private final ConcurrentHashMap.KeySetView inprogressCounter; - private ECReconstructionCoordinator reconstructionCoordinator; - private ECReconstructionCommandInfo reconstructionCommandInfo; + private final ECReconstructionCoordinator reconstructionCoordinator; + private final ECReconstructionCommandInfo reconstructionCommandInfo; + private long deadlineMsSinceEpoch = 0; + private final Clock clock; public ECReconstructionCoordinatorTask( ECReconstructionCoordinator coordinator, ECReconstructionCommandInfo reconstructionCommandInfo, ConcurrentHashMap.KeySetView - inprogressReconstructionCoordinatorCounter) { + inprogressReconstructionCoordinatorCounter, + Clock clock) { this.reconstructionCoordinator = coordinator; this.reconstructionCommandInfo = reconstructionCommandInfo; this.inprogressCounter = inprogressReconstructionCoordinatorCounter; + this.clock = clock; } @Override @@ -69,6 +74,15 @@ public void run() { containerID); } try { + if (reconstructionCommandInfo.getDeadline() > 0 + && clock.millis() > reconstructionCommandInfo.getDeadline()) { + LOG.info("Ignoring this reconstruct container command for container" + + " {} since the current time {}ms is past the deadline {}ms", + containerID, clock.millis(), + reconstructionCommandInfo.getDeadline()); + return; + } + SortedMap sourceNodeMap = reconstructionCommandInfo.getSources().stream().collect(Collectors .toMap(DatanodeDetailsAndReplicaIndex::getReplicaIndex, diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java index e2de7ac6959e..e36636d69356 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java @@ -23,6 +23,7 @@ import java.io.Closeable; import java.io.IOException; +import java.time.Clock; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -39,6 +40,7 @@ public class ECReconstructionSupervisor implements Closeable { private final StateContext context; private final ExecutorService executor; private final ECReconstructionCoordinator reconstructionCoordinator; + private final Clock clock; /** * how many coordinator tasks currently being running. */ @@ -47,18 +49,19 @@ public class ECReconstructionSupervisor implements Closeable { public ECReconstructionSupervisor(ContainerSet containerSet, StateContext context, ExecutorService executor, - ECReconstructionCoordinator coordinator) { + ECReconstructionCoordinator coordinator, Clock clock) { this.containerSet = containerSet; this.context = context; this.executor = executor; this.reconstructionCoordinator = coordinator; this.inProgressReconstrucionCoordinatorCounter = ConcurrentHashMap.newKeySet(); + this.clock = clock; } public ECReconstructionSupervisor(ContainerSet containerSet, StateContext context, int poolSize, - ECReconstructionCoordinator coordinator) { + ECReconstructionCoordinator coordinator, Clock clock) { // TODO: ReplicationSupervisor and this class can be refactored to have a // common interface. this(containerSet, context, @@ -66,7 +69,7 @@ public ECReconstructionSupervisor(ContainerSet containerSet, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("ECContainerReconstructionThread-%d").build()), - coordinator); + coordinator, clock); } public void stop() { @@ -86,7 +89,7 @@ public void addTask(ECReconstructionCommandInfo taskInfo) { .add(taskInfo.getContainerID())) { executor.execute( new ECReconstructionCoordinatorTask(getReconstructionCoordinator(), - taskInfo, inProgressReconstrucionCoordinatorCounter)); + taskInfo, inProgressReconstrucionCoordinatorCounter, clock)); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 5432656e0363..444ab303f518 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone.container.replication; +import java.time.Clock; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap.KeySetView; import java.util.concurrent.ExecutorService; @@ -49,10 +50,12 @@ public class ReplicationSupervisor { private final ContainerReplicator replicator; private final ExecutorService executor; private final StateContext context; + private final Clock clock; private final AtomicLong requestCounter = new AtomicLong(); private final AtomicLong successCounter = new AtomicLong(); private final AtomicLong failureCounter = new AtomicLong(); + private final AtomicLong timeoutCounter = new AtomicLong(); /** * A set of container IDs that are currently being downloaded @@ -64,35 +67,27 @@ public class ReplicationSupervisor { @VisibleForTesting ReplicationSupervisor( ContainerSet containerSet, StateContext context, - ContainerReplicator replicator, ExecutorService executor) { + ContainerReplicator replicator, ExecutorService executor, + Clock clock) { this.containerSet = containerSet; this.replicator = replicator; this.containersInFlight = ConcurrentHashMap.newKeySet(); this.executor = executor; this.context = context; + this.clock = clock; } public ReplicationSupervisor( ContainerSet containerSet, StateContext context, - ContainerReplicator replicator, ReplicationConfig replicationConfig) { - this(containerSet, context, replicator, - replicationConfig.getReplicationMaxStreams()); - } - - public ReplicationSupervisor( - ContainerSet containerSet, StateContext context, - ContainerReplicator replicator, int poolSize) { + ContainerReplicator replicator, ReplicationConfig replicationConfig, + Clock clock) { this(containerSet, context, replicator, new ThreadPoolExecutor( - poolSize, poolSize, 60, TimeUnit.SECONDS, + replicationConfig.getReplicationMaxStreams(), + replicationConfig.getReplicationMaxStreams(), 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("ContainerReplicationThread-%d") - .build())); - } - - public ReplicationSupervisor(ContainerSet containerSet, - ContainerReplicator replicator, int poolSize) { - this(containerSet, null, replicator, poolSize); + .build()), clock); } /** @@ -148,6 +143,14 @@ public void run() { try { requestCounter.incrementAndGet(); + if (task.getDeadline() > 0 && clock.millis() > task.getDeadline()) { + LOG.info("Ignoring this replicate container command for container" + + " {} since the current time {}ms is past the deadline {}ms", + containerId, clock.millis(), task.getDeadline()); + timeoutCounter.incrementAndGet(); + return; + } + if (context != null) { DatanodeDetails dn = context.getParent().getDatanodeDetails(); if (dn.getPersistedOpState() != @@ -206,4 +209,9 @@ public long getReplicationSuccessCount() { public long getReplicationFailureCount() { return failureCounter.get(); } + + public long getReplicationTimeoutCount() { + return timeoutCounter.get(); + } + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java index df48abda4f3c..0576308bd2e0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java @@ -66,6 +66,9 @@ public void getMetrics(MetricsCollector collector, boolean all) { supervisor.getQueueSize()) .addGauge(Interns.info("numRequestedReplications", "Number of requested replications"), - supervisor.getReplicationRequestCount()); + supervisor.getReplicationRequestCount()) + .addGauge(Interns.info("numTimeoutReplications", + "Number of replication requests timed out before being processed"), + supervisor.getReplicationTimeoutCount()); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java index e6e0d0526b5b..b194c2e2077a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java @@ -22,6 +22,7 @@ import java.util.Objects; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; /** * The task to download a container from the sources. @@ -32,21 +33,39 @@ public class ReplicationTask { private final long containerId; - private List sources; + private final List sources; private final Instant queued = Instant.now(); + private long deadlineMsSinceEpoch = 0; + /** * Counter for the transferred bytes. */ private long transferredBytes; - public ReplicationTask( + public ReplicationTask(ReplicateContainerCommand cmd) { + this.containerId = cmd.getContainerID(); + this.sources = cmd.getSourceDatanodes(); + this.deadlineMsSinceEpoch = cmd.getDeadline(); + } + + /** + * Intended to only be used in tests. + */ + protected ReplicationTask( long containerId, List sources ) { - this.containerId = containerId; - this.sources = sources; + this(new ReplicateContainerCommand(containerId, sources)); + } + + /** + * Returns any deadline set on this task, in milliseconds since the epoch. + * A returned value of zero indicates no deadline. + */ + public long getDeadline() { + return deadlineMsSinceEpoch; } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java index 744118e3013f..ab214ef2f670 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java @@ -38,6 +38,8 @@ public abstract class SCMCommand implements private String encodedToken = ""; + private long deadlineMsSinceEpoch = 0; + SCMCommand() { this.id = HddsIdFactory.getLongId(); } @@ -88,4 +90,37 @@ public String getEncodedToken() { public void setEncodedToken(String encodedToken) { this.encodedToken = encodedToken; } + + /** + * Allows a deadline to be set on the command. The deadline is set as the + * milliseconds since the epoch when the command must have been completed by. + * It is up to the code processing the command to enforce the deadline by + * calling the hasExpired() method, and the code sending the command to set + * the deadline. The default deadline is zero, which means no deadline. + * @param deadlineMs The ms since epoch when the command must have completed + * by. + */ + public void setDeadline(long deadlineMs) { + this.deadlineMsSinceEpoch = deadlineMs; + } + + /** + * @return The deadline set for this command, or zero if no command has been + * set. + */ + public long getDeadline() { + return deadlineMsSinceEpoch; + } + + /** + * If a deadline has been set to a non zero value, test if the current time + * passed is beyond the deadline or not. + * @param currentEpochMs current time in milliseconds since the epoch. + * @return false if there is no deadline, or it has not expired. True if the + * set deadline has expired. + */ + public boolean hasExpired(long currentEpochMs) { + return deadlineMsSinceEpoch > 0 && + currentEpochMs > deadlineMsSinceEpoch; + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java new file mode 100644 index 000000000000..282e00f734a5 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; +import org.apache.ozone.test.TestClock; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.mockito.Mockito; + +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; + +import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; +import static org.mockito.Mockito.times; + +/** + * Test for the DeleteContainerCommandHandler. + */ +public class TestDeleteContainerCommandHandler { + + @Test + public void testExpiredCommandsAreNotProcessed() throws IOException { + TestClock clock = new TestClock(Instant.now(), ZoneId.systemDefault()); + DeleteContainerCommandHandler handler = + new DeleteContainerCommandHandler(clock, newDirectExecutorService()); + OzoneContainer ozoneContainer = Mockito.mock(OzoneContainer.class); + ContainerController controller = Mockito.mock(ContainerController.class); + Mockito.when(ozoneContainer.getController()).thenReturn(controller); + + DeleteContainerCommand command1 = new DeleteContainerCommand(1L); + command1.setDeadline(clock.millis() + 10000); + DeleteContainerCommand command2 = new DeleteContainerCommand(2L); + command2.setDeadline(clock.millis() + 20000); + DeleteContainerCommand command3 = new DeleteContainerCommand(3L); + // No deadline on the 3rd command + + clock.fastForward(15000); + handler.handle(command1, ozoneContainer, null, null); + Assertions.assertEquals(1, handler.getTimeoutCount()); + handler.handle(command2, ozoneContainer, null, null); + handler.handle(command3, ozoneContainer, null, null); + Assertions.assertEquals(1, handler.getTimeoutCount()); + Assertions.assertEquals(3, handler.getInvocationCount()); + Mockito.verify(controller, times(0)) + .deleteContainer(1L, false); + Mockito.verify(controller, times(1)) + .deleteContainer(2L, false); + Mockito.verify(controller, times(1)) + .deleteContainer(3L, false); + } + +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java index c40ceb2ea36c..f0b6ff3c368a 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java @@ -21,20 +21,39 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; import org.apache.ozone.test.GenericTestUtils; +import org.apache.ozone.test.TestClock; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; import java.util.SortedMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; +import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; + /** * Tests the ECReconstructionSupervisor. */ public class TestECReconstructionSupervisor { + private TestClock clock; + + @BeforeEach + public void setup() { + clock = new TestClock(Instant.now(), ZoneId.systemDefault()); + } + + @Test public void testAddTaskShouldExecuteTheGivenTask() throws InterruptedException, TimeoutException, IOException { @@ -58,15 +77,61 @@ public void reconstructECContainerGroup(long containerID, super.reconstructECContainerGroup(containerID, repConfig, sourceNodeMap, targetNodeMap); } - }) { + }, clock) { }; - supervisor.addTask( - new ECReconstructionCommandInfo(1, new ECReplicationConfig(3, 2), - new byte[0], ImmutableList.of(), ImmutableList.of())); + ReconstructECContainersCommand command = new ReconstructECContainersCommand( + 1L, ImmutableList.of(), ImmutableList.of(), new byte[0], + new ECReplicationConfig(3, 2)); + supervisor.addTask(new ECReconstructionCommandInfo(command)); runnableInvoked.await(); Assertions.assertEquals(1, supervisor.getInFlightReplications()); holdProcessing.countDown(); GenericTestUtils .waitFor(() -> supervisor.getInFlightReplications() == 0, 100, 15000); } + + @Test + public void testTasksWithDeadlineExceededAreNotRun() throws IOException { + ECReconstructionCoordinator coordinator = + Mockito.mock(ECReconstructionCoordinator.class); + ECReconstructionSupervisor supervisor = + new ECReconstructionSupervisor(null, null, + newDirectExecutorService(), coordinator, clock); + + ReconstructECContainersCommand command = new ReconstructECContainersCommand( + 1L, ImmutableList.of(), ImmutableList.of(), new byte[0], + new ECReplicationConfig(3, 2)); + ECReconstructionCommandInfo task1 = + new ECReconstructionCommandInfo(command); + + command = new ReconstructECContainersCommand( + 2L, ImmutableList.of(), ImmutableList.of(), new byte[0], + new ECReplicationConfig(3, 2)); + command.setDeadline(clock.millis() + 10000); + ECReconstructionCommandInfo task2 = + new ECReconstructionCommandInfo(command); + + command = new ReconstructECContainersCommand( + 3L, ImmutableList.of(), ImmutableList.of(), new byte[0], + new ECReplicationConfig(3, 2)); + command.setDeadline(clock.millis() + 20000); + ECReconstructionCommandInfo task3 = + new ECReconstructionCommandInfo(command); + + clock.fastForward(15000); + supervisor.addTask(task1); + supervisor.addTask(task2); + supervisor.addTask(task3); + + // No deadline for container 1, it should run. + Mockito.verify(coordinator, times(1)) + .reconstructECContainerGroup(eq(1L), any(), any(), any()); + // Deadline passed for container 2, it should not run. + Mockito.verify(coordinator, times(0)) + .reconstructECContainerGroup(eq(2L), any(), any(), any()); + // Deadline not passed for container 3, it should run. + Mockito.verify(coordinator, times(1)) + .reconstructECContainerGroup(eq(3L), any(), any(), any()); + } + } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java index 457f0a58ae46..1ec6fecd40e8 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone.container.replication; +import java.time.ZoneId; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -24,8 +25,10 @@ import java.util.Random; import java.util.UUID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.ozone.common.MonotonicClock; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.junit.jupiter.api.Assertions; @@ -43,6 +46,9 @@ public class ReplicationSupervisorScheduling { @Test public void test() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + ReplicationServer.ReplicationConfig replicationConfig + = conf.getObject(ReplicationServer.ReplicationConfig.class); List datanodes = new ArrayList<>(); datanodes.add(MockDatanodeDetails.randomDatanodeDetails()); datanodes.add(MockDatanodeDetails.randomDatanodeDetails()); @@ -69,7 +75,7 @@ public void test() throws InterruptedException { ContainerSet cs = new ContainerSet(1000); - ReplicationSupervisor rs = new ReplicationSupervisor(cs, + ReplicationSupervisor rs = new ReplicationSupervisor(cs, null, //simplified executor emulating the current sequential download + //import. @@ -107,7 +113,7 @@ public void test() throws InterruptedException { } } - }, 10); + }, replicationConfig, new MonotonicClock(ZoneId.systemDefault())); final long start = System.currentTimeMillis(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 51f78440bb4c..6ca50854783a 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -20,6 +20,8 @@ import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Instant; +import java.time.ZoneId; import java.util.List; import java.util.UUID; import java.util.concurrent.AbstractExecutorService; @@ -38,7 +40,9 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.ozone.test.GenericTestUtils; +import org.apache.ozone.test.TestClock; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -76,6 +80,7 @@ public class TestReplicationSupervisor { private ContainerSet set; private final ContainerLayoutVersion layout; + private TestClock clock; public TestReplicationSupervisor(ContainerLayoutVersion layout) { this.layout = layout; @@ -88,6 +93,7 @@ public static Iterable parameters() { @Before public void setUp() throws Exception { + clock = new TestClock(Instant.now(), ZoneId.systemDefault()); set = new ContainerSet(1000); } @@ -231,7 +237,7 @@ public void slowDownload() { public void testDownloadAndImportReplicatorFailure() { ReplicationSupervisor supervisor = new ReplicationSupervisor(set, null, mutableReplicator, - newDirectExecutorService()); + newDirectExecutorService(), clock); // Mock to fetch an exception in the importContainer method. SimpleContainerDownloader moc = @@ -256,6 +262,39 @@ public void testDownloadAndImportReplicatorFailure() { .contains("Container 1 replication was unsuccessful.")); } + @Test + public void testTaskBeyondDeadline() { + ReplicationSupervisor supervisor = + supervisorWithReplicator(FakeReplicator::new); + + ReplicateContainerCommand cmd = new ReplicateContainerCommand(1L, + emptyList()); + cmd.setDeadline(clock.millis() + 10000); + ReplicationTask task1 = new ReplicationTask(cmd); + cmd = new ReplicateContainerCommand(2L, emptyList()); + cmd.setDeadline(clock.millis() + 20000); + ReplicationTask task2 = new ReplicationTask(cmd); + cmd = new ReplicateContainerCommand(3L, emptyList()); + // No deadline set + ReplicationTask task3 = new ReplicationTask(cmd); + // no deadline set + + clock.fastForward(15000); + + supervisor.addTask(task1); + supervisor.addTask(task2); + supervisor.addTask(task3); + + Assert.assertEquals(3, supervisor.getReplicationRequestCount()); + Assert.assertEquals(2, supervisor.getReplicationSuccessCount()); + Assert.assertEquals(0, supervisor.getReplicationFailureCount()); + Assert.assertEquals(0, supervisor.getInFlightReplications()); + Assert.assertEquals(0, supervisor.getQueueSize()); + Assert.assertEquals(1, supervisor.getReplicationTimeoutCount()); + Assert.assertEquals(2, set.containerCount()); + + } + private ReplicationSupervisor supervisorWithReplicator( Function replicatorFactory) { return supervisorWith(replicatorFactory, newDirectExecutorService()); @@ -265,7 +304,8 @@ private ReplicationSupervisor supervisorWith( Function replicatorFactory, ExecutorService executor) { ReplicationSupervisor supervisor = - new ReplicationSupervisor(set, null, mutableReplicator, executor); + new ReplicationSupervisor(set, null, mutableReplicator, executor, + clock); replicatorRef.set(replicatorFactory.apply(supervisor)); return supervisor; } diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto index 6465eeb40b47..7f8476d99500 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto @@ -343,6 +343,7 @@ message SCMCommandProto { // SCM is a leader. If running without Ratis, holds SCMContext.INVALID_TERM. optional int64 term = 15; optional string encodedToken = 16; + optional int64 deadlineMsSinceEpoch = 17; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index f78536912ebc..261803a3babf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigType; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.PostConstruct; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; @@ -417,6 +418,8 @@ public void sendDatanodeCommand(SCMCommand command, LOG.info("Sending command of type {} for container {} to {}", command.getType(), containerInfo, target); command.setTerm(getScmTerm()); + command.setDeadline(clock.millis() + + Math.round(rmConf.eventTimeout * rmConf.commandDeadlineFactor)); final CommandForDatanode datanodeCommand = new CommandForDatanode<>(target.getUuid(), command); eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand); @@ -765,6 +768,30 @@ public void setEventTimeout(Duration timeout) { this.eventTimeout = timeout.toMillis(); } + /** + * Deadline which should be set on commands sent from ReplicationManager + * to the datanodes, as a percentage of the event.timeout. If the command + * has not been processed on the datanode by this time, it will be dropped + * by the datanode and Replication Manager will need to resend it. + */ + @Config(key = "command.deadline.factor", + type = ConfigType.DOUBLE, + defaultValue = "0.9", + tags = {SCM, OZONE}, + description = "Fraction of the hdds.scm.replication.event.timeout " + + "from the current time which should be set as a deadline for " + + "commands sent from ReplicationManager to datanodes. " + + "Commands which are not processed before this deadline will be " + + "dropped by the datanodes. Should be a value > 0 and <= 1.") + private double commandDeadlineFactor = 0.9; + public double getCommandDeadlineFactor() { + return commandDeadlineFactor; + } + + public void setCommandDeadlineFactor(double val) { + commandDeadlineFactor = val; + } + /** * The number of container replica which must be available for a node to * enter maintenance. @@ -811,6 +838,15 @@ public void setMaintenanceReplicaMinimum(int replicaCount) { ) private int maintenanceRemainingRedundancy = 1; + @PostConstruct + public void validate() { + if (!(commandDeadlineFactor > 0) || (commandDeadlineFactor > 1)) { + throw new IllegalArgumentException("command.deadline.factor is set to " + + commandDeadlineFactor + + " and must be greater than 0 and less than equal to 1"); + } + } + public void setMaintenanceRemainingRedundancy(int redundancy) { this.maintenanceRemainingRedundancy = redundancy; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index f2654fac027b..467adbe23a4d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -272,7 +272,7 @@ public SCMHeartbeatResponseProto sendHeartbeat( SCMHeartbeatRequestProto heartbeat) throws IOException, TimeoutException { List cmdResponses = new ArrayList<>(); for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) { - cmdResponses.add(getCommandResponse(cmd)); + cmdResponses.add(getCommandResponse(cmd, scm)); } boolean auditSuccess = true; Map auditMap = Maps.newHashMap(); @@ -305,14 +305,17 @@ public SCMHeartbeatResponseProto sendHeartbeat( * @throws IOException */ @VisibleForTesting - public SCMCommandProto getCommandResponse(SCMCommand cmd) - throws IOException, TimeoutException { + public static SCMCommandProto getCommandResponse(SCMCommand cmd, + OzoneStorageContainerManager scm) throws IOException, TimeoutException { SCMCommandProto.Builder builder = SCMCommandProto.newBuilder() .setEncodedToken(cmd.getEncodedToken()); // In HA mode, it is the term of current leader SCM. // In non-HA mode, it is the default value 0. builder.setTerm(cmd.getTerm()); + // The default deadline is 0, which means no deadline. Individual commands + // may have a deadline set. + builder.setDeadlineMsSinceEpoch(cmd.getDeadline()); switch (cmd.getType()) { case reregisterCommand: diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index 82f971be799e..92ec4995706f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -477,6 +477,15 @@ public void testSendDatanodeReplicateCommand() throws NotLeaderException { replicationManager.sendDatanodeCommand(command, containerInfo, target); + // Ensure that the command deadline is set to current time + // + evenTime * factor + ReplicationManager.ReplicationManagerConfiguration rmConf = configuration + .getObject(ReplicationManager.ReplicationManagerConfiguration.class); + long expectedDeadline = clock.millis() + + Math.round(rmConf.getEventTimeout() * + rmConf.getCommandDeadlineFactor()); + Assert.assertEquals(expectedDeadline, command.getDeadline()); + List ops = containerReplicaPendingOps.getPendingOps( containerInfo.containerID()); Mockito.verify(eventPublisher).fireEvent(any(), any()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java new file mode 100644 index 000000000000..cfa34fc445a0 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.junit.Assert; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.TimeoutException; + +/** + * Test for StorageContainerDatanodeProtocolProtos. + */ +public class TestSCMDatanodeProtocolServer { + + @Test + public void ensureTermAndDeadlineOnCommands() + throws IOException, TimeoutException { + OzoneStorageContainerManager scm = + Mockito.mock(OzoneStorageContainerManager.class); + + ReplicateContainerCommand command = new ReplicateContainerCommand(1L, + Collections.emptyList()); + command.setTerm(5L); + command.setDeadline(1234L); + StorageContainerDatanodeProtocolProtos.SCMCommandProto proto = + SCMDatanodeProtocolServer.getCommandResponse(command, scm); + + Assert.assertEquals(StorageContainerDatanodeProtocolProtos.SCMCommandProto + .Type.replicateContainerCommand, proto.getCommandType()); + Assert.assertEquals(5L, proto.getTerm()); + Assert.assertEquals(1234L, proto.getDeadlineMsSinceEpoch()); + } +} diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java index a7d332e7839a..ebfa2d863765 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.ozone.common.MonotonicClock; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Handler; @@ -37,9 +38,11 @@ import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.replication.ContainerReplicator; import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator; +import org.apache.hadoop.ozone.container.replication.ReplicationServer; import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; import org.apache.hadoop.ozone.container.replication.ReplicationTask; import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.jetbrains.annotations.NotNull; import picocli.CommandLine.Command; import picocli.CommandLine.Option; @@ -48,6 +51,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -123,8 +127,9 @@ public Void call() throws Exception { //if datanode is specified, replicate only container if it has a //replica. if (datanode.isEmpty() || datanodeUUIDs.contains(datanode)) { - replicationTasks.add(new ReplicationTask(container.getContainerID(), - datanodesWithContainer)); + replicationTasks.add(new ReplicationTask( + new ReplicateContainerCommand(container.getContainerID(), + datanodesWithContainer))); } } @@ -203,7 +208,11 @@ private void initializeReplicationSupervisor(ConfigurationSource conf) new SimpleContainerDownloader(conf, null), new TarContainerPacker()); - supervisor = new ReplicationSupervisor(containerSet, replicator, 10); + ReplicationServer.ReplicationConfig replicationConfig + = conf.getObject(ReplicationServer.ReplicationConfig.class); + supervisor = new ReplicationSupervisor(containerSet, null, + replicator, replicationConfig, + new MonotonicClock(ZoneId.systemDefault())); } private void replicateContainer(long counter) throws Exception {