diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java index 44c783846ad1..e7231f03a9bc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -66,12 +66,14 @@ public void handle(SCMCommand command, OzoneContainer container, final List sourceDatanodes = replicateCommand.getSourceDatanodes(); final long containerID = replicateCommand.getContainerID(); + final long timeoutMs = replicateCommand.getTimeoutMs(); Preconditions.checkArgument(sourceDatanodes.size() > 0, "Replication command is received for container %s " + "without source datanodes.", containerID); - supervisor.addTask(new ReplicationTask(containerID, sourceDatanodes)); + supervisor.addTask(new ReplicationTask(containerID, sourceDatanodes, + timeoutMs)); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 5432656e0363..14e8718d3dae 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.ozone.container.replication; +import java.time.Duration; +import java.time.Instant; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap.KeySetView; import java.util.concurrent.ExecutorService; @@ -53,6 +55,7 @@ public class ReplicationSupervisor { private final AtomicLong requestCounter = new AtomicLong(); private final AtomicLong successCounter = new AtomicLong(); private final AtomicLong failureCounter = new AtomicLong(); + private final AtomicLong timeoutCounter = new AtomicLong(); /** * A set of container IDs that are currently being downloaded @@ -147,6 +150,17 @@ public void run() { final Long containerId = task.getContainerId(); try { requestCounter.incrementAndGet(); + if (task.getTimeoutMs() != 0) { + long msInQueue = + Duration.between(task.getQueued(), Instant.now()).toMillis(); + if (msInQueue > task.getTimeoutMs()) { + LOG.info("Ignore this replicate container command for container" + + " {} since queueTime larger then timeout {}ms", + containerId, task.getTimeoutMs()); + timeoutCounter.incrementAndGet(); + return; + } + } if (context != null) { DatanodeDetails dn = context.getParent().getDatanodeDetails(); @@ -206,4 +220,8 @@ public long getReplicationSuccessCount() { public long getReplicationFailureCount() { return failureCounter.get(); } + + public long getReplicationTimeoutCount() { + return timeoutCounter.get(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java index df48abda4f3c..9c5eba40858a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java @@ -66,6 +66,9 @@ public void getMetrics(MetricsCollector collector, boolean all) { supervisor.getQueueSize()) .addGauge(Interns.info("numRequestedReplications", "Number of requested replications"), - supervisor.getReplicationRequestCount()); + supervisor.getReplicationRequestCount()) + .addGauge(Interns.info("numTimeoutReplications", + "Number of timeout replications in queue"), + supervisor.getReplicationTimeoutCount()); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java index e6e0d0526b5b..8043db990d10 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java @@ -34,6 +34,8 @@ public class ReplicationTask { private List sources; + private final long timeoutMs; + private final Instant queued = Instant.now(); /** @@ -43,10 +45,18 @@ public class ReplicationTask { public ReplicationTask( long containerId, - List sources + List sources) { + this(containerId, sources, 0L); + } + + public ReplicationTask( + long containerId, + List sources, + long timeoutMs ) { this.containerId = containerId; this.sources = sources; + this.timeoutMs = timeoutMs; } @Override @@ -74,6 +84,9 @@ public List getSources() { return sources; } + public long getTimeoutMs() { + return timeoutMs; + } public Status getStatus() { return status; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java index e663bed794f3..24146c6051cd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java @@ -41,20 +41,28 @@ public class ReplicateContainerCommand private final long containerID; private final List sourceDatanodes; + private final long timeoutMs; public ReplicateContainerCommand(long containerID, List sourceDatanodes) { + this(containerID, sourceDatanodes, 0L); + } + + public ReplicateContainerCommand(long containerID, + List sourceDatanodes, long timeoutMs) { super(); this.containerID = containerID; this.sourceDatanodes = sourceDatanodes; + this.timeoutMs = timeoutMs; } // Should be called only for protobuf conversion public ReplicateContainerCommand(long containerID, - List sourceDatanodes, long id) { + List sourceDatanodes, long id, long timeoutMs) { super(id); this.containerID = containerID; this.sourceDatanodes = sourceDatanodes; + this.timeoutMs = timeoutMs; } @Override @@ -66,7 +74,8 @@ public Type getType() { public ReplicateContainerCommandProto getProto() { Builder builder = ReplicateContainerCommandProto.newBuilder() .setCmdId(getId()) - .setContainerID(containerID); + .setContainerID(containerID) + .setTimeoutMs(timeoutMs); for (DatanodeDetails dd : sourceDatanodes) { builder.addSources(dd.getProtoBufMessage()); } @@ -84,7 +93,7 @@ public static ReplicateContainerCommand getFromProtobuf( .collect(Collectors.toList()); return new ReplicateContainerCommand(protoMessage.getContainerID(), - datanodeDetails, protoMessage.getCmdId()); + datanodeDetails, protoMessage.getCmdId(), protoMessage.getTimeoutMs()); } @@ -95,4 +104,8 @@ public long getContainerID() { public List getSourceDatanodes() { return sourceDatanodes; } + + public long getTimeoutMs() { + return timeoutMs; + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 8078fc25c897..a33f2a9bdb3e 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -227,6 +227,34 @@ public void slowDownload() { } } + @Test + public void testTimeoutTask() { + // GIVEN + ReplicationSupervisor supervisor = supervisorWith(__ -> slowReplicator, + new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>())); + + try { + //WHEN + supervisor.addTask(new ReplicationTask(1L, emptyList())); + supervisor.addTask(new ReplicationTask(2L, emptyList(), 100L)); + supervisor.addTask(new ReplicationTask(3L, emptyList(), 100L)); + + //THEN + Assert.assertEquals(3, supervisor.getInFlightReplications()); + Assert.assertEquals(2, supervisor.getQueueSize()); + // Sleep 2s, wait all tasks processed + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + } + Assert.assertEquals(0, supervisor.getInFlightReplications()); + Assert.assertEquals(0, supervisor.getQueueSize()); + Assert.assertEquals(2, supervisor.getReplicationTimeoutCount()); + } finally { + supervisor.stop(); + } + } @Test public void testDownloadAndImportReplicatorFailure() { ReplicationSupervisor supervisor = diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto index e998b846df0e..2e8053d2584b 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto @@ -409,6 +409,7 @@ message ReplicateContainerCommandProto { required int64 containerID = 1; repeated DatanodeDetailsProto sources = 2; required int64 cmdId = 3; + optional int64 timeoutMs = 4; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java index 47ac3d354154..1d5580f8520e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java @@ -1449,7 +1449,8 @@ private void sendReplicateCommand(final ContainerInfo container, final ContainerID id = container.containerID(); final ReplicateContainerCommand replicateCommand = - new ReplicateContainerCommand(id.getId(), sources); + new ReplicateContainerCommand(id.getId(), sources, + rmConf.getEventTimeout()); inflightReplication.computeIfAbsent(id, k -> new ArrayList<>()); sendAndTrackDatanodeCommand(datanode, replicateCommand, action -> inflightReplication.get(id).add(action));