Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.io.Closeable;
import java.io.IOException;
import java.time.Clock;
import java.time.ZoneId;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -40,6 +42,7 @@
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.ozone.HddsDatanodeStopService;
import org.apache.hadoop.ozone.common.MonotonicClock;
import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
import org.apache.hadoop.ozone.container.common.report.ReportManager;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler;
Expand Down Expand Up @@ -139,6 +142,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
this.conf = conf;
this.datanodeDetails = datanodeDetails;

Clock clock = new MonotonicClock(ZoneId.systemDefault());
// Expected to be initialized already.
layoutStorage = new DatanodeLayoutStorage(conf,
datanodeDetails.getUuidString());
Expand Down Expand Up @@ -180,7 +184,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
conf.getObject(ReplicationConfig.class);
supervisor =
new ReplicationSupervisor(container.getContainerSet(), context,
replicatorMetrics, replicationConfig);
replicatorMetrics, replicationConfig, clock);

replicationSupervisorMetrics =
ReplicationSupervisorMetrics.create(supervisor);
Expand All @@ -193,7 +197,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
ecReconstructionSupervisor =
new ECReconstructionSupervisor(container.getContainerSet(), context,
replicationConfig.getReplicationMaxStreams(),
ecReconstructionCoordinator);
ecReconstructionCoordinator, clock);


// When we add new handlers just adding a new handler here should do the
Expand All @@ -207,7 +211,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
.addHandler(new ReconstructECContainersCommandHandler(conf,
ecReconstructionSupervisor))
.addHandler(new DeleteContainerCommandHandler(
dnConf.getContainerDeleteThreads()))
dnConf.getContainerDeleteThreads(), clock))
.addHandler(new ClosePipelineCommandHandler())
.addHandler(new CreatePipelineCommandHandler(conf))
.addHandler(new SetNodeOperationalStateCommandHandler(conf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Clock;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -48,15 +49,22 @@ public class DeleteContainerCommandHandler implements CommandHandler {
LoggerFactory.getLogger(DeleteContainerCommandHandler.class);

private final AtomicInteger invocationCount = new AtomicInteger(0);
private final AtomicInteger timeoutCount = new AtomicInteger(0);
private final AtomicLong totalTime = new AtomicLong(0);
private final ExecutorService executor;
private final Clock clock;

public DeleteContainerCommandHandler(int threadPoolSize) {
this.executor = Executors.newFixedThreadPool(
public DeleteContainerCommandHandler(int threadPoolSize, Clock clock) {
this(clock, Executors.newFixedThreadPool(
threadPoolSize, new ThreadFactoryBuilder()
.setNameFormat("DeleteContainerThread-%d").build());
.setNameFormat("DeleteContainerThread-%d").build()));
}

protected DeleteContainerCommandHandler(Clock clock,
ExecutorService executor) {
this.executor = executor;
this.clock = clock;
}
@Override
public void handle(final SCMCommand command,
final OzoneContainer ozoneContainer,
Expand All @@ -69,6 +77,14 @@ public void handle(final SCMCommand command,
final long startTime = Time.monotonicNow();
invocationCount.incrementAndGet();
try {
if (command.hasExpired(clock.millis())) {
LOG.info("Not processing the delete container command for " +
"container {} as the current time {}ms is after the command " +
"deadline {}ms", deleteContainerCommand.getContainerID(),
clock.millis(), command.getDeadline());
timeoutCount.incrementAndGet();
return;
}
controller.deleteContainer(deleteContainerCommand.getContainerID(),
deleteContainerCommand.isForce());
} catch (IOException e) {
Expand All @@ -94,6 +110,10 @@ public int getInvocationCount() {
return this.invocationCount.get();
}

public int getTimeoutCount() {
return this.timeoutCount.get();
}

@Override
public long getAverageRunTime() {
final int invocations = invocationCount.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@ public void handle(SCMCommand command, OzoneContainer container,
ReconstructECContainersCommand ecContainersCommand =
(ReconstructECContainersCommand) command;
ECReconstructionCommandInfo reconstructionCommandInfo =
new ECReconstructionCommandInfo(ecContainersCommand.getContainerID(),
ecContainersCommand.getEcReplicationConfig(),
ecContainersCommand.getMissingContainerIndexes(),
ecContainersCommand.getSources(),
ecContainersCommand.getTargetDatanodes());
new ECReconstructionCommandInfo(ecContainersCommand);
this.supervisor.addTask(reconstructionCommandInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public void handle(SCMCommand command, OzoneContainer container,
"Replication command is received for container %s "
+ "without source datanodes.", containerID);

supervisor.addTask(new ReplicationTask(containerID, sourceDatanodes));
ReplicationTask task = new ReplicationTask(replicateCommand);
supervisor.addTask(task);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ private void processResponse(SCMHeartbeatResponseProto response,
* Common processing for SCM commands.
* - set term
* - set encoded token
* - any deadline which is relevant to the command
* - add to context's queue
*/
private void processCommonCommand(
Expand All @@ -436,6 +437,9 @@ private void processCommonCommand(
if (response.hasEncodedToken()) {
cmd.setEncodedToken(response.getEncodedToken());
}
if (response.hasDeadlineMsSinceEpoch()) {
cmd.setDeadline(response.getDeadlineMsSinceEpoch());
}
context.addCommand(cmd);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,21 @@ public class ECReconstructionCommandInfo {
private List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
sources;
private List<DatanodeDetails> targetDatanodes;
private long deadlineMsSinceEpoch = 0;

public ECReconstructionCommandInfo(long containerID,
ECReplicationConfig ecReplicationConfig, byte[] missingContainerIndexes,
List<DatanodeDetailsAndReplicaIndex> sources,
List<DatanodeDetails> targetDatanodes) {
this.containerID = containerID;
this.ecReplicationConfig = ecReplicationConfig;
public ECReconstructionCommandInfo(ReconstructECContainersCommand cmd) {
this.containerID = cmd.getContainerID();
this.ecReplicationConfig = cmd.getEcReplicationConfig();
this.missingContainerIndexes =
Arrays.copyOf(missingContainerIndexes, missingContainerIndexes.length);
this.sources = sources;
this.targetDatanodes = targetDatanodes;
Arrays.copyOf(cmd.getMissingContainerIndexes(),
cmd.getMissingContainerIndexes().length);
this.sources = cmd.getSources();
this.targetDatanodes = cmd.getTargetDatanodes();
this.deadlineMsSinceEpoch = cmd.getDeadline();
}

public long getDeadline() {
return deadlineMsSinceEpoch;
}

public long getContainerID() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Clock;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -36,17 +37,21 @@ public class ECReconstructionCoordinatorTask implements Runnable {
static final Logger LOG =
LoggerFactory.getLogger(ECReconstructionCoordinatorTask.class);
private final ConcurrentHashMap.KeySetView<Object, Boolean> inprogressCounter;
private ECReconstructionCoordinator reconstructionCoordinator;
private ECReconstructionCommandInfo reconstructionCommandInfo;
private final ECReconstructionCoordinator reconstructionCoordinator;
private final ECReconstructionCommandInfo reconstructionCommandInfo;
private long deadlineMsSinceEpoch = 0;
private final Clock clock;

public ECReconstructionCoordinatorTask(
ECReconstructionCoordinator coordinator,
ECReconstructionCommandInfo reconstructionCommandInfo,
ConcurrentHashMap.KeySetView<Object, Boolean>
inprogressReconstructionCoordinatorCounter) {
inprogressReconstructionCoordinatorCounter,
Clock clock) {
this.reconstructionCoordinator = coordinator;
this.reconstructionCommandInfo = reconstructionCommandInfo;
this.inprogressCounter = inprogressReconstructionCoordinatorCounter;
this.clock = clock;
}

@Override
Expand All @@ -69,6 +74,15 @@ public void run() {
containerID);
}
try {
if (reconstructionCommandInfo.getDeadline() > 0
&& clock.millis() > reconstructionCommandInfo.getDeadline()) {
LOG.info("Ignoring this reconstruct container command for container" +
" {} since the current time {}ms is past the deadline {}ms",
containerID, clock.millis(),
reconstructionCommandInfo.getDeadline());
return;
}

SortedMap<Integer, DatanodeDetails> sourceNodeMap =
reconstructionCommandInfo.getSources().stream().collect(Collectors
.toMap(DatanodeDetailsAndReplicaIndex::getReplicaIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.time.Clock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -39,6 +40,7 @@ public class ECReconstructionSupervisor implements Closeable {
private final StateContext context;
private final ExecutorService executor;
private final ECReconstructionCoordinator reconstructionCoordinator;
private final Clock clock;
/**
* how many coordinator tasks currently being running.
*/
Expand All @@ -47,26 +49,27 @@ public class ECReconstructionSupervisor implements Closeable {

public ECReconstructionSupervisor(ContainerSet containerSet,
StateContext context, ExecutorService executor,
ECReconstructionCoordinator coordinator) {
ECReconstructionCoordinator coordinator, Clock clock) {
this.containerSet = containerSet;
this.context = context;
this.executor = executor;
this.reconstructionCoordinator = coordinator;
this.inProgressReconstrucionCoordinatorCounter =
ConcurrentHashMap.newKeySet();
this.clock = clock;
}

public ECReconstructionSupervisor(ContainerSet containerSet,
StateContext context, int poolSize,
ECReconstructionCoordinator coordinator) {
ECReconstructionCoordinator coordinator, Clock clock) {
// TODO: ReplicationSupervisor and this class can be refactored to have a
// common interface.
this(containerSet, context,
new ThreadPoolExecutor(poolSize, poolSize, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ECContainerReconstructionThread-%d").build()),
coordinator);
coordinator, clock);
}

public void stop() {
Expand All @@ -86,7 +89,7 @@ public void addTask(ECReconstructionCommandInfo taskInfo) {
.add(taskInfo.getContainerID())) {
executor.execute(
new ECReconstructionCoordinatorTask(getReconstructionCoordinator(),
taskInfo, inProgressReconstrucionCoordinatorCounter));
taskInfo, inProgressReconstrucionCoordinatorCounter, clock));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.container.replication;

import java.time.Clock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -49,10 +50,12 @@ public class ReplicationSupervisor {
private final ContainerReplicator replicator;
private final ExecutorService executor;
private final StateContext context;
private final Clock clock;

private final AtomicLong requestCounter = new AtomicLong();
private final AtomicLong successCounter = new AtomicLong();
private final AtomicLong failureCounter = new AtomicLong();
private final AtomicLong timeoutCounter = new AtomicLong();

/**
* A set of container IDs that are currently being downloaded
Expand All @@ -64,35 +67,27 @@ public class ReplicationSupervisor {
@VisibleForTesting
ReplicationSupervisor(
ContainerSet containerSet, StateContext context,
ContainerReplicator replicator, ExecutorService executor) {
ContainerReplicator replicator, ExecutorService executor,
Clock clock) {
this.containerSet = containerSet;
this.replicator = replicator;
this.containersInFlight = ConcurrentHashMap.newKeySet();
this.executor = executor;
this.context = context;
this.clock = clock;
}

public ReplicationSupervisor(
ContainerSet containerSet, StateContext context,
ContainerReplicator replicator, ReplicationConfig replicationConfig) {
this(containerSet, context, replicator,
replicationConfig.getReplicationMaxStreams());
}

public ReplicationSupervisor(
ContainerSet containerSet, StateContext context,
ContainerReplicator replicator, int poolSize) {
ContainerReplicator replicator, ReplicationConfig replicationConfig,
Clock clock) {
this(containerSet, context, replicator, new ThreadPoolExecutor(
poolSize, poolSize, 60, TimeUnit.SECONDS,
replicationConfig.getReplicationMaxStreams(),
replicationConfig.getReplicationMaxStreams(), 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ContainerReplicationThread-%d")
.build()));
}

public ReplicationSupervisor(ContainerSet containerSet,
ContainerReplicator replicator, int poolSize) {
this(containerSet, null, replicator, poolSize);
.build()), clock);
}

/**
Expand Down Expand Up @@ -148,6 +143,14 @@ public void run() {
try {
requestCounter.incrementAndGet();

if (task.getDeadline() > 0 && clock.millis() > task.getDeadline()) {
LOG.info("Ignoring this replicate container command for container" +
" {} since the current time {}ms is past the deadline {}ms",
containerId, clock.millis(), task.getDeadline());
timeoutCounter.incrementAndGet();
return;
}

if (context != null) {
DatanodeDetails dn = context.getParent().getDatanodeDetails();
if (dn.getPersistedOpState() !=
Expand Down Expand Up @@ -206,4 +209,9 @@ public long getReplicationSuccessCount() {
public long getReplicationFailureCount() {
return failureCounter.get();
}

public long getReplicationTimeoutCount() {
return timeoutCounter.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public void getMetrics(MetricsCollector collector, boolean all) {
supervisor.getQueueSize())
.addGauge(Interns.info("numRequestedReplications",
"Number of requested replications"),
supervisor.getReplicationRequestCount());
supervisor.getReplicationRequestCount())
.addGauge(Interns.info("numTimeoutReplications",
"Number of replication requests timed out before being processed"),
supervisor.getReplicationTimeoutCount());
}
}
Loading