Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,14 @@ public void handle(SCMCommand command, OzoneContainer container,
final List<DatanodeDetails> sourceDatanodes =
replicateCommand.getSourceDatanodes();
final long containerID = replicateCommand.getContainerID();
final long timeoutMs = replicateCommand.getTimeoutMs();

Preconditions.checkArgument(sourceDatanodes.size() > 0,
"Replication command is received for container %s "
+ "without source datanodes.", containerID);

supervisor.addTask(new ReplicationTask(containerID, sourceDatanodes));
supervisor.addTask(new ReplicationTask(containerID, sourceDatanodes,
timeoutMs));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.ozone.container.replication;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -53,6 +55,7 @@ public class ReplicationSupervisor {
private final AtomicLong requestCounter = new AtomicLong();
private final AtomicLong successCounter = new AtomicLong();
private final AtomicLong failureCounter = new AtomicLong();
private final AtomicLong timeoutCounter = new AtomicLong();

/**
* A set of container IDs that are currently being downloaded
Expand Down Expand Up @@ -147,6 +150,17 @@ public void run() {
final Long containerId = task.getContainerId();
try {
requestCounter.incrementAndGet();
if (task.getTimeoutMs() != 0) {
long msInQueue =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than calling Instance.now here, can we inject an instance of MonotonicClock and use it to get the time. This will make tests easier and avoid sleep calls in the tests. This is something we have used in other places in the code before.

Duration.between(task.getQueued(), Instant.now()).toMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 queues in the DN, the main command queue and then the replication queue. The time set for getQueued() is the time the command is placed onto the replication queue. However, there are some delays in the system here eg:

  1. The command is created in SCM and queued on SCM. It could be a full DN heartbeat interval before it gets picked up.

  2. The commands lands on the DN command queue. Something may hold it up getting onto the replication queue.

I think this would mean the pending operation in SCM could expire before DN command does, and it gives SCM a chance to schedule another command before this gets expired on the DN.

I think it might be better if we set the expiry time on the command (in ms since epoch) when it is created in SCM to match the SCM timeout, and then it avoids any of these delays. Infact, it may make sense to make the DN command expiry a little less than the SCM timeout, so the DN command tends to expire slightly earlier than the SCM timeout.

if (msInQueue > task.getTimeoutMs()) {
LOG.info("Ignore this replicate container command for container" +
" {} since queueTime larger then timeout {}ms",
containerId, task.getTimeoutMs());
timeoutCounter.incrementAndGet();
return;
}
}

if (context != null) {
DatanodeDetails dn = context.getParent().getDatanodeDetails();
Expand Down Expand Up @@ -206,4 +220,8 @@ public long getReplicationSuccessCount() {
public long getReplicationFailureCount() {
return failureCounter.get();
}

public long getReplicationTimeoutCount() {
return timeoutCounter.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public void getMetrics(MetricsCollector collector, boolean all) {
supervisor.getQueueSize())
.addGauge(Interns.info("numRequestedReplications",
"Number of requested replications"),
supervisor.getReplicationRequestCount());
supervisor.getReplicationRequestCount())
.addGauge(Interns.info("numTimeoutReplications",
"Number of timeout replications in queue"),
supervisor.getReplicationTimeoutCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class ReplicationTask {

private List<DatanodeDetails> sources;

private final long timeoutMs;

private final Instant queued = Instant.now();

/**
Expand All @@ -43,10 +45,18 @@ public class ReplicationTask {

public ReplicationTask(
long containerId,
List<DatanodeDetails> sources
List<DatanodeDetails> sources) {
this(containerId, sources, 0L);
}

public ReplicationTask(
long containerId,
List<DatanodeDetails> sources,
long timeoutMs
) {
this.containerId = containerId;
this.sources = sources;
this.timeoutMs = timeoutMs;
}

@Override
Expand Down Expand Up @@ -74,6 +84,9 @@ public List<DatanodeDetails> getSources() {
return sources;
}

public long getTimeoutMs() {
return timeoutMs;
}
public Status getStatus() {
return status;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,28 @@ public class ReplicateContainerCommand

private final long containerID;
private final List<DatanodeDetails> sourceDatanodes;
private final long timeoutMs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go with my earlier suggestion of setting the "run by time" in SCM on the command, perhaps rename "timeout" to something like "deadlineEpochMs" or "expiryEpochMs" throughout this method.


public ReplicateContainerCommand(long containerID,
List<DatanodeDetails> sourceDatanodes) {
this(containerID, sourceDatanodes, 0L);
}

public ReplicateContainerCommand(long containerID,
List<DatanodeDetails> sourceDatanodes, long timeoutMs) {
super();
this.containerID = containerID;
this.sourceDatanodes = sourceDatanodes;
this.timeoutMs = timeoutMs;
}

// Should be called only for protobuf conversion
public ReplicateContainerCommand(long containerID,
List<DatanodeDetails> sourceDatanodes, long id) {
List<DatanodeDetails> sourceDatanodes, long id, long timeoutMs) {
super(id);
this.containerID = containerID;
this.sourceDatanodes = sourceDatanodes;
this.timeoutMs = timeoutMs;
}

@Override
Expand All @@ -66,7 +74,8 @@ public Type getType() {
public ReplicateContainerCommandProto getProto() {
Builder builder = ReplicateContainerCommandProto.newBuilder()
.setCmdId(getId())
.setContainerID(containerID);
.setContainerID(containerID)
.setTimeoutMs(timeoutMs);
for (DatanodeDetails dd : sourceDatanodes) {
builder.addSources(dd.getProtoBufMessage());
}
Expand All @@ -84,7 +93,7 @@ public static ReplicateContainerCommand getFromProtobuf(
.collect(Collectors.toList());

return new ReplicateContainerCommand(protoMessage.getContainerID(),
datanodeDetails, protoMessage.getCmdId());
datanodeDetails, protoMessage.getCmdId(), protoMessage.getTimeoutMs());

}

Expand All @@ -95,4 +104,8 @@ public long getContainerID() {
public List<DatanodeDetails> getSourceDatanodes() {
return sourceDatanodes;
}

public long getTimeoutMs() {
return timeoutMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,34 @@ public void slowDownload() {
}
}

@Test
public void testTimeoutTask() {
// GIVEN
ReplicationSupervisor supervisor = supervisorWith(__ -> slowReplicator,
new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>()));

try {
//WHEN
supervisor.addTask(new ReplicationTask(1L, emptyList()));
supervisor.addTask(new ReplicationTask(2L, emptyList(), 100L));
supervisor.addTask(new ReplicationTask(3L, emptyList(), 100L));

//THEN
Assert.assertEquals(3, supervisor.getInFlightReplications());
Assert.assertEquals(2, supervisor.getQueueSize());
// Sleep 2s, wait all tasks processed
try {
Thread.sleep(2000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we inject the MontonicClock into the Supervisor, then we can create it with a instance of TestClock that we can use to avoid the sleep calls and make the tests faster.

} catch (InterruptedException e) {
}
Assert.assertEquals(0, supervisor.getInFlightReplications());
Assert.assertEquals(0, supervisor.getQueueSize());
Assert.assertEquals(2, supervisor.getReplicationTimeoutCount());
} finally {
supervisor.stop();
}
}
@Test
public void testDownloadAndImportReplicatorFailure() {
ReplicationSupervisor supervisor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ message ReplicateContainerCommandProto {
required int64 containerID = 1;
repeated DatanodeDetailsProto sources = 2;
required int64 cmdId = 3;
optional int64 timeoutMs = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we rename the timeout to expiry or deadline, then change it here too.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1449,7 +1449,8 @@ private void sendReplicateCommand(final ContainerInfo container,

final ContainerID id = container.containerID();
final ReplicateContainerCommand replicateCommand =
new ReplicateContainerCommand(id.getId(), sources);
new ReplicateContainerCommand(id.getId(), sources,
rmConf.getEventTimeout());
inflightReplication.computeIfAbsent(id, k -> new ArrayList<>());
sendAndTrackDatanodeCommand(datanode, replicateCommand,
action -> inflightReplication.get(id).add(action));
Expand Down