From 7fd8d16f0c9f29d7f2eac0fe89374add6445e0fc Mon Sep 17 00:00:00 2001 From: David Mollitor Date: Sun, 24 Jan 2021 20:32:20 -0500 Subject: [PATCH 1/2] TEZ-4269: Re-Work Threadpool in DAGAppMaster --- .../org/apache/tez/dag/app/AppContext.java | 5 +---- .../org/apache/tez/dag/app/DAGAppMaster.java | 18 +++++------------- 2 files changed, 6 insertions(+), 17 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java index fc4ddcfcdc..c9a7083c1d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java @@ -20,7 +20,7 @@ import java.util.Map; import java.util.Set; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -74,9 +74,6 @@ public interface AppContext { DAG getCurrentDAG(); - // For testing only! - ThreadPoolExecutor getThreadPool(); - ListeningExecutorService getExecService(); void setDAG(DAG dag); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index cde77b3bf6..ea15555634 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -47,10 +47,10 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -302,7 +302,7 @@ public class DAGAppMaster extends AbstractService { private Path tezSystemStagingDir; private FileSystem recoveryFS; - private ThreadPoolExecutor rawExecutor; + private ExecutorService rawExecutor; private ListeningExecutorService execService; // TODO May not need to be a bidi map @@ -623,9 +623,9 @@ public synchronized void serviceInit(final Configuration conf) throws Exception TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT_DEFAULT); // NOTE: LinkedBlockingQueue does not have a capacity Limit and can thus // occupy large memory chunks when numerous Runables are pending for execution - rawExecutor = new ThreadPoolExecutor(threadCount, threadCount, - 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("App Shared Pool - " + "#%d").build()); + rawExecutor = + Executors.newFixedThreadPool(threadCount, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("App Shared Pool - #%d").build()); execService = MoreExecutors.listeningDecorator(rawExecutor); initServices(conf); @@ -1505,14 +1505,6 @@ public DAG getCurrentDAG() { return dag; } - @Override - // For Testing only! - public ThreadPoolExecutor getThreadPool() { - synchronized (DAGAppMaster.this) { - return rawExecutor; - } - } - @Override public ListeningExecutorService getExecService() { return execService; From 42835aa07dd7aa3ada68cb0fb0a45219abb4ba77 Mon Sep 17 00:00:00 2001 From: David Mollitor Date: Thu, 28 Jan 2021 11:00:44 -0500 Subject: [PATCH 2/2] Remove reference to rawExecutor --- tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index ea15555634..518a1fb362 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -302,7 +302,6 @@ public class DAGAppMaster extends AbstractService { private Path tezSystemStagingDir; private FileSystem recoveryFS; - private ExecutorService rawExecutor; private ListeningExecutorService execService; // TODO May not need to be a bidi map @@ -623,7 +622,7 @@ public synchronized void serviceInit(final Configuration conf) throws Exception TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT_DEFAULT); // NOTE: LinkedBlockingQueue does not have a capacity Limit and can thus // occupy large memory chunks when numerous Runables are pending for execution - rawExecutor = + ExecutorService rawExecutor = Executors.newFixedThreadPool(threadCount, new ThreadFactoryBuilder() .setDaemon(true).setNameFormat("App Shared Pool - #%d").build()); execService = MoreExecutors.listeningDecorator(rawExecutor);