From ccb107df39638b25983db9cfbc326891c16b7976 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Mon, 11 Nov 2019 13:02:54 +0000 Subject: [PATCH 1/4] Datanode container delete commands will not run in a threadpool rather than on the main commandDispatcher thread --- .../statemachine/DatanodeStateMachine.java | 8 ++- .../commandhandler/CommandDispatcher.java | 6 ++ .../commandhandler/CommandHandler.java | 8 +++ .../DeleteContainerCommandHandler.java | 68 ++++++++++++++----- 4 files changed, 71 insertions(+), 19 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 2763278b0d2b..489e9fc0ebee 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -130,7 +130,9 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, .addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(), conf)) .addHandler(new ReplicateContainerCommandHandler(conf, supervisor)) - .addHandler(new DeleteContainerCommandHandler()) + .addHandler(new DeleteContainerCommandHandler(2)) + .addHandler(new ClosePipelineCommandHandler()) + .addHandler(new CreatePipelineCommandHandler()) .setConnectionManager(connectionManager) .setContainer(container) .setContext(context) @@ -278,6 +280,10 @@ public void close() throws IOException { if (jvmPauseMonitor != null) { jvmPauseMonitor.stop(); } + + if (commandDispatcher != null) { + commandDispatcher.stop(); + } } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java index af854ec3d61a..911b1b8053a5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java @@ -104,6 +104,12 @@ public void handle(SCMCommand command) { } } + public void stop() { + for (CommandHandler c : handlerMap.values()) { + c.stop(); + } + } + public static Builder newBuilder() { return new Builder(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java index 1ea0ea845150..70ed9cab88c8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java @@ -72,4 +72,12 @@ default void updateCommandStatus(StateContext context, SCMCommand command, command.getId()); } } + + /** + * Override for any command with an internal threadpool, and stop the + * executor when this method is invoked. + */ + default void stop() { + // Default implementation does nothing + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java index b54fb1a17ac0..bf6af56c3549 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.ozone.container.common.statemachine @@ -31,6 +32,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * Handler to process the DeleteContainerCommand from SCM. @@ -40,28 +46,39 @@ public class DeleteContainerCommandHandler implements CommandHandler { private static final Logger LOG = LoggerFactory.getLogger(DeleteContainerCommandHandler.class); - private int invocationCount; - private long totalTime; + private final AtomicInteger invocationCount = new AtomicInteger(0); + private final AtomicLong totalTime = new AtomicLong(0); + private final ThreadPoolExecutor executor; + + public DeleteContainerCommandHandler(int threadPoolSize) { + this.executor = new ThreadPoolExecutor( + 0, threadPoolSize, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("DeleteContainerThread-%d") + .build()); + } @Override public void handle(final SCMCommand command, final OzoneContainer ozoneContainer, final StateContext context, final SCMConnectionManager connectionManager) { - final long startTime = Time.monotonicNow(); - invocationCount++; - try { - final DeleteContainerCommand deleteContainerCommand = - (DeleteContainerCommand) command; - final ContainerController controller = ozoneContainer.getController(); - controller.deleteContainer(deleteContainerCommand.getContainerID(), - deleteContainerCommand.isForce()); - } catch (IOException e) { - LOG.error("Exception occurred while deleting the container.", e); - } finally { - totalTime += Time.monotonicNow() - startTime; - } - + final DeleteContainerCommand deleteContainerCommand = + (DeleteContainerCommand) command; + final ContainerController controller = ozoneContainer.getController(); + executor.execute(() -> { + final long startTime = Time.monotonicNow(); + invocationCount.incrementAndGet(); + try { + controller.deleteContainer(deleteContainerCommand.getContainerID(), + deleteContainerCommand.isForce()); + } catch (IOException e) { + LOG.error("Exception occurred while deleting the container.", e); + } finally { + totalTime.getAndAdd(Time.monotonicNow() - startTime); + } + }); } @Override @@ -71,11 +88,26 @@ public SCMCommandProto.Type getCommandType() { @Override public int getInvocationCount() { - return this.invocationCount; + return this.invocationCount.get(); } @Override public long getAverageRunTime() { - return invocationCount == 0 ? 0 : totalTime / invocationCount; + return invocationCount.get() == 0 ? + 0 : totalTime.get() / invocationCount.get(); } + + @Override + public void stop() { + try { + executor.shutdown(); + if (!executor.awaitTermination(3, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + // Ignore, we don't really care about the failure. + Thread.currentThread().interrupt(); + } + } + } From c7ccd7e1010f2cef1bff06e7f4d2d0bfdb6247d2 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Thu, 14 Nov 2019 21:31:53 +0000 Subject: [PATCH 2/4] Make delete container thread count configurable --- .../statemachine/DatanodeConfiguration.java | 26 +++++++++++++++++++ .../statemachine/DatanodeStateMachine.java | 3 ++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index dd0fa6722ea6..888bf0a7c6f8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -36,6 +36,11 @@ public class DatanodeConfiguration { * simultaneously. */ private int replicationMaxStreams = 10; + /** + * The maximum number of threads used to delete containers on a datanode + * simultaneously. + */ + private int deleteContainerThreads = 2; @Config(key = "replication.streams.limit", type = ConfigType.INT, @@ -58,4 +63,25 @@ public int getReplicationMaxStreams() { return replicationMaxStreams; } + @Config(key = "delete.container.threads", + type = ConfigType.INT, + defaultValue = "2", + tags = {DATANODE}, + description = "The maximum number of threads used to delete containers " + + "on a datanode" + ) + public void setDeleteContainerThreads(int val) { + if (val < 1) { + LOG.warn("hdds.datanode.delete.container.threads must be greater than" + + "zero and was set to {}. Defaulting to {}", + val, deleteContainerThreads); + } else { + this.deleteContainerThreads = val; + } + } + + public int getDeleteContainerThreads() { + return deleteContainerThreads; + } + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 489e9fc0ebee..6bfcbcd73b25 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -130,9 +130,10 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, .addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(), conf)) .addHandler(new ReplicateContainerCommandHandler(conf, supervisor)) - .addHandler(new DeleteContainerCommandHandler(2)) .addHandler(new ClosePipelineCommandHandler()) .addHandler(new CreatePipelineCommandHandler()) + .addHandler(new DeleteContainerCommandHandler( + dnConf.getDeleteContainerThreads())) .setConnectionManager(connectionManager) .setContainer(container) .setContext(context) From 471a648ba564a428e827883487495cbc33ccf9fa Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Tue, 19 Nov 2019 12:11:44 +0000 Subject: [PATCH 3/4] Removed references to handlers no longer in use --- .../container/common/statemachine/DatanodeStateMachine.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 6bfcbcd73b25..3dc371b00bf3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -130,8 +130,6 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, .addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(), conf)) .addHandler(new ReplicateContainerCommandHandler(conf, supervisor)) - .addHandler(new ClosePipelineCommandHandler()) - .addHandler(new CreatePipelineCommandHandler()) .addHandler(new DeleteContainerCommandHandler( dnConf.getDeleteContainerThreads())) .setConnectionManager(connectionManager) From 5d88bd5b600c4f3d4a96728daa06a7b3543330a5 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Tue, 19 Nov 2019 15:51:04 +0000 Subject: [PATCH 4/4] Updates related to review comments --- .../statemachine/DatanodeConfiguration.java | 26 +++++++++++-------- .../statemachine/DatanodeStateMachine.java | 2 +- .../DeleteContainerCommandHandler.java | 8 +++--- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index 888bf0a7c6f8..3d2a834a7083 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -35,12 +35,14 @@ public class DatanodeConfiguration { * The maximum number of replication commands a single datanode can execute * simultaneously. */ - private int replicationMaxStreams = 10; + private final int replicationMaxStreamsDefault = 10; + private int replicationMaxStreams = replicationMaxStreamsDefault; /** * The maximum number of threads used to delete containers on a datanode * simultaneously. */ - private int deleteContainerThreads = 2; + private final int containerDeleteThreadsDefault = 2; + private int containerDeleteThreads = containerDeleteThreadsDefault; @Config(key = "replication.streams.limit", type = ConfigType.INT, @@ -53,7 +55,8 @@ public void setReplicationMaxStreams(int val) { if (val < 1) { LOG.warn("hdds.datanode.replication.streams.limit must be greater than" + "zero and was set to {}. Defaulting to {}", - val, replicationMaxStreams); + val, replicationMaxStreamsDefault); + replicationMaxStreams = replicationMaxStreamsDefault; } else { this.replicationMaxStreams = val; } @@ -63,25 +66,26 @@ public int getReplicationMaxStreams() { return replicationMaxStreams; } - @Config(key = "delete.container.threads", + @Config(key = "container.delete.threads.max", type = ConfigType.INT, defaultValue = "2", tags = {DATANODE}, description = "The maximum number of threads used to delete containers " + "on a datanode" ) - public void setDeleteContainerThreads(int val) { + public void setContainerDeleteThreads(int val) { if (val < 1) { - LOG.warn("hdds.datanode.delete.container.threads must be greater than" + - "zero and was set to {}. Defaulting to {}", - val, deleteContainerThreads); + LOG.warn("hdds.datanode.container.delete.threads.max must be greater " + + "than zero and was set to {}. Defaulting to {}", + val, containerDeleteThreadsDefault); + containerDeleteThreads = containerDeleteThreadsDefault; } else { - this.deleteContainerThreads = val; + this.containerDeleteThreads = val; } } - public int getDeleteContainerThreads() { - return deleteContainerThreads; + public int getContainerDeleteThreads() { + return containerDeleteThreads; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 3dc371b00bf3..5424b6b1da3e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -131,7 +131,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, conf)) .addHandler(new ReplicateContainerCommandHandler(conf, supervisor)) .addHandler(new DeleteContainerCommandHandler( - dnConf.getDeleteContainerThreads())) + dnConf.getContainerDeleteThreads())) .setConnectionManager(connectionManager) .setContainer(container) .setContext(context) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java index bf6af56c3549..9dc6021bfdcf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -48,7 +49,7 @@ public class DeleteContainerCommandHandler implements CommandHandler { private final AtomicInteger invocationCount = new AtomicInteger(0); private final AtomicLong totalTime = new AtomicLong(0); - private final ThreadPoolExecutor executor; + private final ExecutorService executor; public DeleteContainerCommandHandler(int threadPoolSize) { this.executor = new ThreadPoolExecutor( @@ -93,8 +94,9 @@ public int getInvocationCount() { @Override public long getAverageRunTime() { - return invocationCount.get() == 0 ? - 0 : totalTime.get() / invocationCount.get(); + final int invocations = invocationCount.get(); + return invocations == 0 ? + 0 : totalTime.get() / invocations; } @Override