From 670381b7d3dc953d15c4c781021ab4a0eced2f83 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Mon, 3 Jul 2023 19:33:32 +0530 Subject: [PATCH 01/12] TEZ-4344: Collect jstack periodically from all containers. Change-Id: I8d8b13c03a76a61c053b851743c1d2426e2fdd7b --- .../apache/tez/dag/api/TezConfiguration.java | 8 + .../org/apache/tez/dag/app/DAGAppMaster.java | 34 +++++ .../tez/runtime/TezThreadDumpHelper.java | 141 ++++++++++++++++++ .../org/apache/tez/runtime/task/TezChild.java | 28 +++- .../java/org/apache/tez/test/TestTezJobs.java | 44 +++++- 5 files changed, 253 insertions(+), 2 deletions(-) create mode 100644 tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 5842067ff8..d0addd4f8c 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -2296,4 +2296,12 @@ static Set getPropertySet() { @ConfigurationProperty public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES = "tez.mrreader.config.update.properties"; + /** + * Frequency at which thread dump should be captured. Supports TimeUnits. + */ + @ConfigurationScope(Scope.DAG) + @ConfigurationProperty + public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval"; + public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0"; + } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 3c99b1afd9..baa2f5c62a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -185,6 +185,7 @@ import org.apache.tez.dag.utils.Simple2LevelVersionComparator; import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.hadoop.shim.HadoopShimsLoader; +import org.apache.tez.runtime.TezThreadDumpHelper; import org.apache.tez.util.LoggingUtils; import org.apache.tez.util.TezMxBeanResourceCalculator; import org.codehaus.jettison.json.JSONException; @@ -199,6 +200,11 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_REMOTE_APP_LOG_DIR; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_DEFAULT; + /** * The Tez DAG Application Master. * The state machine is encapsulated in the implementation of Job interface. @@ -340,6 +346,7 @@ public class DAGAppMaster extends AbstractService { Map services = new LinkedHashMap(); private ThreadLocalMap mdcContext; + private HashMap tezThreadDumpHelper = new HashMap<>(); public DAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, @@ -766,6 +773,10 @@ protected synchronized void handle(DAGAppMasterEvent event) { "DAGAppMaster Internal Error occurred"); break; case DAG_FINISHED: + TezThreadDumpHelper threadDumService = tezThreadDumpHelper.remove(currentDAG.getName()); + if (threadDumService != null) { + threadDumService.shutdownPeriodicThreadDumpService(); + } DAGAppMasterEventDAGFinished finishEvt = (DAGAppMasterEventDAGFinished) event; String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); @@ -952,6 +963,11 @@ public void shutdown(boolean now) { } LOG.info("Handling DAGAppMaster shutdown"); + for (Map.Entry t : tezThreadDumpHelper.entrySet()) { + t.getValue().shutdownPeriodicThreadDumpService(); + } + tezThreadDumpHelper.clear(); + AMShutdownRunnable r = new AMShutdownRunnable(now, sleepTimeBeforeExit); Thread t = new Thread(r, "AMShutdownThread"); t.start(); @@ -2577,6 +2593,24 @@ void stopVertexServices(DAG dag) { private void startDAGExecution(DAG dag, final Map additionalAmResources) throws TezException { currentDAG = dag; + + long periodicThreadDumpFrequency = dag.getConf() + .getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + if (periodicThreadDumpFrequency > 0) { + LOG.info("Periodic Thread Dump Capture Service Configured to capture Thread Dumps at {} ms", + periodicThreadDumpFrequency); + Path basePath = new Path(dag.getConf().get(NM_REMOTE_APP_LOG_DIR, DEFAULT_NM_REMOTE_APP_LOG_DIR)); + try { + TezThreadDumpHelper tezThreadDumpService = + new TezThreadDumpHelper(periodicThreadDumpFrequency, basePath, dag.getConf()); + tezThreadDumpHelper.put(dag.getName(), tezThreadDumpService); + tezThreadDumpService.schedulePeriodicThreadDumpService(dag.getName() + "_AppMaster"); + } catch (IOException e) { + LOG.warn("Can not initialize periodic thread dump service for {}", dag.getName(), e); + } + } // Try localizing the actual resources. List additionalUrlsForClasspath; try { diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java new file mode 100644 index 0000000000..cbaa6fb773 --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.PrintStream; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class TezThreadDumpHelper { + + private final long duration; + private final Path basePath; + private final FileSystem fs; + + static private final ThreadMXBean THREAD_BEAN = ManagementFactory.getThreadMXBean(); + private ScheduledExecutorService periodicThreadDumpServiceExecutor; + + public TezThreadDumpHelper(long duration, Path basePath, Configuration conf) throws IOException { + this.duration = duration; + this.basePath = basePath; + this.fs = basePath.getFileSystem(conf); + } + + public void schedulePeriodicThreadDumpService(String dagName) { + periodicThreadDumpServiceExecutor = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true). + setNameFormat("PeriodicThreadDumpService{" + dagName + "} #%d").build()); + Runnable threadDumpCollector = new ThreadDumpCollector(basePath, dagName, fs); + periodicThreadDumpServiceExecutor.schedule(threadDumpCollector, duration, TimeUnit.MILLISECONDS); + } + + public void shutdownPeriodicThreadDumpService() { + if (periodicThreadDumpServiceExecutor != null) { + periodicThreadDumpServiceExecutor.shutdown(); + + try { + if (!periodicThreadDumpServiceExecutor.awaitTermination(100, TimeUnit.MILLISECONDS)) { + periodicThreadDumpServiceExecutor.shutdownNow(); + } + } catch (InterruptedException ignored) { + // Ignore interrupt, will attempt a final shutdown below. + } + periodicThreadDumpServiceExecutor.shutdownNow(); + periodicThreadDumpServiceExecutor = null; + } + } + + private static class ThreadDumpCollector implements Runnable { + + private final Path path; + private final String dagName; + private final FileSystem fs; + + ThreadDumpCollector(Path path, String dagName, FileSystem fs) { + this.path = path; + this.fs = fs; + this.dagName = dagName; + } + + @Override + public void run() { + if (!Thread.interrupted()) { + try (FSDataOutputStream fsStream = fs.create( + new Path(path, dagName + "_" + System.currentTimeMillis() + ".jstack")); + PrintStream printStream = new PrintStream(fsStream, false, "UTF8")) { + printThreadInfo(printStream, dagName); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public synchronized void printThreadInfo(PrintStream stream, String title) { + boolean contention = THREAD_BEAN.isThreadContentionMonitoringEnabled(); + long[] threadIds = THREAD_BEAN.getAllThreadIds(); + stream.println("Process Thread Dump: " + title); + stream.println(threadIds.length + " active threads"); + for (long tid : threadIds) { + ThreadInfo info = THREAD_BEAN.getThreadInfo(tid, Integer.MAX_VALUE); + if (info == null) { + stream.println(" Inactive"); + continue; + } + stream.println("Thread " + getTaskName(info.getThreadId(), info.getThreadName()) + ":"); + Thread.State state = info.getThreadState(); + stream.println(" State: " + state); + stream.println(" Blocked count: " + info.getBlockedCount()); + stream.println(" Waited count: " + info.getWaitedCount()); + if (contention) { + stream.println(" Blocked time: " + info.getBlockedTime()); + stream.println(" Waited time: " + info.getWaitedTime()); + } + if (state == Thread.State.WAITING) { + stream.println(" Waiting on " + info.getLockName()); + } else if (state == Thread.State.BLOCKED) { + stream.println(" Blocked on " + info.getLockName()); + stream.println(" Blocked by " + getTaskName(info.getLockOwnerId(), info.getLockOwnerName())); + } + stream.println(" Stack:"); + for (StackTraceElement frame : info.getStackTrace()) { + stream.println(" " + frame.toString()); + } + } + stream.flush(); + } + + private String getTaskName(long id, String name) { + if (name == null) { + return Long.toString(id); + } + return id + " (" + name + ")"; + } + } +} diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 7ab532ad33..831e8ef4f0 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -40,6 +41,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; @@ -69,6 +71,7 @@ import org.apache.tez.dag.utils.RelocalizationUtils; import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.hadoop.shim.HadoopShimsLoader; +import org.apache.tez.runtime.TezThreadDumpHelper; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; @@ -91,6 +94,11 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_REMOTE_APP_LOG_DIR; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_DEFAULT; + public class TezChild { private static final Logger LOG = LoggerFactory.getLogger(TezChild.class); @@ -119,6 +127,7 @@ public class TezChild { private final AtomicBoolean isShutdown = new AtomicBoolean(false); private final String user; private final boolean updateSysCounters; + private TezThreadDumpHelper tezThreadDumpHelper = null; private Multimap startedInputsMap = HashMultimap.create(); private final boolean ownUmbilical; @@ -178,7 +187,7 @@ public TezChild(Configuration conf, String host, int port, String containerIdent if (LOG.isDebugEnabled()) { LOG.debug("Executing with tokens:"); for (Token token : credentials.getAllTokens()) { - LOG.debug("",token); + LOG.debug("{}",token); } } @@ -207,6 +216,16 @@ public TezTaskUmbilicalProtocol run() throws Exception { ownUmbilical = false; } TezCommonUtils.logCredentials(LOG, credentials, "tezChildInit"); + + long periodicThreadDumpFrequency = + defaultConf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + + if (periodicThreadDumpFrequency > 0) { + LOG.info("Periodic Thread Dump Capture Service Configured to capture Thread Dumps at {} ms", + periodicThreadDumpFrequency); + Path basePath = new Path(defaultConf.get(NM_REMOTE_APP_LOG_DIR, DEFAULT_NM_REMOTE_APP_LOG_DIR)); + tezThreadDumpHelper = new TezThreadDumpHelper(periodicThreadDumpFrequency, basePath, defaultConf); + } } public ContainerExecutionResult run() throws IOException, InterruptedException, TezException { @@ -229,6 +248,9 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, ContainerTask containerTask = null; try { containerTask = getTaskFuture.get(); + if (tezThreadDumpHelper != null) { + tezThreadDumpHelper.schedulePeriodicThreadDumpService(containerTask.getTaskSpec().getDAGName()); + } } catch (ExecutionException e) { error = true; Throwable cause = e.getCause(); @@ -425,6 +447,10 @@ public void shutdown() { RPC.stopProxy(umbilical); } } + + if (tezThreadDumpHelper != null) { + tezThreadDumpHelper.shutdownPeriodicThreadDumpService(); + } TezRuntimeShutdownHandler.shutdown(); LOG.info("TezChild shutdown finished"); } diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index fd8f08b42c..54cf99be13 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -18,6 +18,9 @@ package org.apache.tez.test; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_REMOTE_APP_LOG_DIR; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -40,6 +43,8 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.tez.common.Preconditions; import com.google.common.collect.Lists; @@ -533,8 +538,21 @@ public void testPerIOCounterAggregation() throws Exception { @Test(timeout = 60000) public void testSortMergeJoinExampleDisableSplitGrouping() throws Exception { + testSortMergeJoinExampleDisableSplitGrouping(false); + } + + @Test + public void testSortMergeJoinExampleWithThreadDump() throws Exception { + testSortMergeJoinExampleDisableSplitGrouping(true); + } + + public void testSortMergeJoinExampleDisableSplitGrouping(boolean withThreadDum) throws Exception { SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample(); - sortMergeJoinExample.setConf(new Configuration(mrrTezCluster.getConfig())); + Configuration newConf = new Configuration(mrrTezCluster.getConfig()); + if (withThreadDum) { + newConf.set(TEZ_THREAD_DUMP_INTERVAL, "2ms"); + } + sortMergeJoinExample.setConf(newConf); Path stagingDirPath = new Path(TEST_ROOT_DIR + "/tmp/tez-staging-dir"); Path inPath1 = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/inPath1"); Path inPath2 = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/inPath2"); @@ -587,6 +605,30 @@ public boolean accept(Path p) { reader.close(); inStream.close(); assertEquals(0, expectedResult.size()); + + if (withThreadDum) { + validateThreadDumpCaptured(); + } + } + + private static void validateThreadDumpCaptured() throws IOException { + Path jstackPath = new Path(conf.get(NM_REMOTE_APP_LOG_DIR, DEFAULT_NM_REMOTE_APP_LOG_DIR)); + RemoteIterator files = localFs.listFiles(jstackPath, true); + boolean threadDumpFound = false; + boolean appMasterDumpFound = false; + while (files.hasNext()) { + LocatedFileStatus file = files.next(); + if (file.getPath().getName().endsWith(".jstack")) { + if(file.getPath().getName().contains("AppMaster")) { + appMasterDumpFound = true; + } else { + threadDumpFound = true; + } + } + } + assertTrue(threadDumpFound); + assertTrue(appMasterDumpFound); + localFs.delete(jstackPath, true); } /** From 996f3078d590247a6c53646ebc36304e4cdf1e76 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Tue, 4 Jul 2023 00:40:34 +0530 Subject: [PATCH 02/12] Remove use of Set and keep single instance. Change-Id: I0353a0dc983a5c02b7bf183d1d5541f15af388dd --- .../org/apache/tez/dag/app/DAGAppMaster.java | 24 +++++++++---------- .../org/apache/tez/runtime/task/TezChild.java | 2 +- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index baa2f5c62a..719ee7de3d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -346,7 +346,7 @@ public class DAGAppMaster extends AbstractService { Map services = new LinkedHashMap(); private ThreadLocalMap mdcContext; - private HashMap tezThreadDumpHelper = new HashMap<>(); + private TezThreadDumpHelper tezThreadDumpHelper = null; public DAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, @@ -773,9 +773,9 @@ protected synchronized void handle(DAGAppMasterEvent event) { "DAGAppMaster Internal Error occurred"); break; case DAG_FINISHED: - TezThreadDumpHelper threadDumService = tezThreadDumpHelper.remove(currentDAG.getName()); - if (threadDumService != null) { - threadDumService.shutdownPeriodicThreadDumpService(); + if (tezThreadDumpHelper != null) { + tezThreadDumpHelper.shutdownPeriodicThreadDumpService(); + tezThreadDumpHelper = null; } DAGAppMasterEventDAGFinished finishEvt = (DAGAppMasterEventDAGFinished) event; @@ -963,10 +963,11 @@ public void shutdown(boolean now) { } LOG.info("Handling DAGAppMaster shutdown"); - for (Map.Entry t : tezThreadDumpHelper.entrySet()) { - t.getValue().shutdownPeriodicThreadDumpService(); + // Check if the thread dump service is up in any case, if yes attempt a shutdown + if (tezThreadDumpHelper != null) { + tezThreadDumpHelper.shutdownPeriodicThreadDumpService(); + tezThreadDumpHelper = null; } - tezThreadDumpHelper.clear(); AMShutdownRunnable r = new AMShutdownRunnable(now, sleepTimeBeforeExit); Thread t = new Thread(r, "AMShutdownThread"); @@ -2595,18 +2596,15 @@ private void startDAGExecution(DAG dag, final Map additio currentDAG = dag; long periodicThreadDumpFrequency = dag.getConf() - .getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS); + .getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); if (periodicThreadDumpFrequency > 0) { LOG.info("Periodic Thread Dump Capture Service Configured to capture Thread Dumps at {} ms", periodicThreadDumpFrequency); Path basePath = new Path(dag.getConf().get(NM_REMOTE_APP_LOG_DIR, DEFAULT_NM_REMOTE_APP_LOG_DIR)); try { - TezThreadDumpHelper tezThreadDumpService = - new TezThreadDumpHelper(periodicThreadDumpFrequency, basePath, dag.getConf()); - tezThreadDumpHelper.put(dag.getName(), tezThreadDumpService); - tezThreadDumpService.schedulePeriodicThreadDumpService(dag.getName() + "_AppMaster"); + tezThreadDumpHelper = new TezThreadDumpHelper(periodicThreadDumpFrequency, basePath, dag.getConf()); + tezThreadDumpHelper.schedulePeriodicThreadDumpService(dag.getName() + "_AppMaster"); } catch (IOException e) { LOG.warn("Can not initialize periodic thread dump service for {}", dag.getName(), e); } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 831e8ef4f0..62cf34da54 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -187,7 +187,7 @@ public TezChild(Configuration conf, String host, int port, String containerIdent if (LOG.isDebugEnabled()) { LOG.debug("Executing with tokens:"); for (Token token : credentials.getAllTokens()) { - LOG.debug("{}",token); + LOG.debug("{}", token); } } From 9b4f9f639073ca9e9ca730121f5c7e1f842f7a93 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Tue, 4 Jul 2023 00:54:15 +0530 Subject: [PATCH 03/12] Remove use of Set and keep single instance + Refactor. Change-Id: Ib0167ef6b52faebc02876ab895f69d8c592a0aca --- .../org/apache/tez/dag/app/DAGAppMaster.java | 22 +++---------- .../tez/runtime/TezThreadDumpHelper.java | 31 +++++++++++++++++-- .../org/apache/tez/runtime/task/TezChild.java | 15 ++------- 3 files changed, 36 insertions(+), 32 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 719ee7de3d..c8bf827dc9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -200,10 +200,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR; -import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_REMOTE_APP_LOG_DIR; -import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL; -import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_DEFAULT; +import static org.apache.tez.runtime.TezThreadDumpHelper.getTezThreadDumpHelper; /** * The Tez DAG Application Master. @@ -2595,20 +2592,11 @@ private void startDAGExecution(DAG dag, final Map additio throws TezException { currentDAG = dag; - long periodicThreadDumpFrequency = dag.getConf() - .getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); - - if (periodicThreadDumpFrequency > 0) { - LOG.info("Periodic Thread Dump Capture Service Configured to capture Thread Dumps at {} ms", - periodicThreadDumpFrequency); - Path basePath = new Path(dag.getConf().get(NM_REMOTE_APP_LOG_DIR, DEFAULT_NM_REMOTE_APP_LOG_DIR)); - try { - tezThreadDumpHelper = new TezThreadDumpHelper(periodicThreadDumpFrequency, basePath, dag.getConf()); - tezThreadDumpHelper.schedulePeriodicThreadDumpService(dag.getName() + "_AppMaster"); - } catch (IOException e) { - LOG.warn("Can not initialize periodic thread dump service for {}", dag.getName(), e); - } + tezThreadDumpHelper = getTezThreadDumpHelper(dag.getConf()); + if (tezThreadDumpHelper != null) { + tezThreadDumpHelper.schedulePeriodicThreadDumpService(dag.getName() + "_AppMaster"); } + // Try localizing the actual resources. List additionalUrlsForClasspath; try { diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java index cbaa6fb773..668eb2745c 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -23,6 +23,8 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.PrintStream; @@ -33,21 +35,46 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_REMOTE_APP_LOG_DIR; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_DEFAULT; + public class TezThreadDumpHelper { private final long duration; private final Path basePath; private final FileSystem fs; - static private final ThreadMXBean THREAD_BEAN = ManagementFactory.getThreadMXBean(); + private static final ThreadMXBean THREAD_BEAN = ManagementFactory.getThreadMXBean(); + private static final Logger LOG = LoggerFactory.getLogger(TezThreadDumpHelper.class); + private ScheduledExecutorService periodicThreadDumpServiceExecutor; - public TezThreadDumpHelper(long duration, Path basePath, Configuration conf) throws IOException { + private TezThreadDumpHelper(long duration, Path basePath, Configuration conf) throws IOException { this.duration = duration; this.basePath = basePath; this.fs = basePath.getFileSystem(conf); } + public static TezThreadDumpHelper getTezThreadDumpHelper(Configuration conf) { + long periodicThreadDumpFrequency = + conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + + if (periodicThreadDumpFrequency > 0) { + Path basePath = new Path(conf.get(NM_REMOTE_APP_LOG_DIR, DEFAULT_NM_REMOTE_APP_LOG_DIR)); + LOG.info("Periodic Thread Dump Capture Service Configured to capture Thread Dumps at {} ms frequency and at " + + "path: {}", periodicThreadDumpFrequency, basePath); + try { + return new TezThreadDumpHelper(periodicThreadDumpFrequency, basePath, conf); + } catch (IOException e) { + LOG.warn("Can not initialize periodic thread dump service", e); + } + + } + return null; + } + public void schedulePeriodicThreadDumpService(String dagName) { periodicThreadDumpServiceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true). diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 62cf34da54..29b5708c36 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -94,10 +94,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR; -import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_REMOTE_APP_LOG_DIR; -import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL; -import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_DEFAULT; +import static org.apache.tez.runtime.TezThreadDumpHelper.getTezThreadDumpHelper; public class TezChild { @@ -217,15 +214,7 @@ public TezTaskUmbilicalProtocol run() throws Exception { } TezCommonUtils.logCredentials(LOG, credentials, "tezChildInit"); - long periodicThreadDumpFrequency = - defaultConf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); - - if (periodicThreadDumpFrequency > 0) { - LOG.info("Periodic Thread Dump Capture Service Configured to capture Thread Dumps at {} ms", - periodicThreadDumpFrequency); - Path basePath = new Path(defaultConf.get(NM_REMOTE_APP_LOG_DIR, DEFAULT_NM_REMOTE_APP_LOG_DIR)); - tezThreadDumpHelper = new TezThreadDumpHelper(periodicThreadDumpFrequency, basePath, defaultConf); - } + tezThreadDumpHelper = getTezThreadDumpHelper(conf); } public ContainerExecutionResult run() throws IOException, InterruptedException, TezException { From 41cfcdc1f6867916fc19da33548674029c7b3b3b Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Tue, 4 Jul 2023 23:07:05 +0530 Subject: [PATCH 04/12] Fix Checkstyle. Change-Id: I3d484a6dcadd8f8b880cceb12cae48e22b2580a7 --- .../main/java/org/apache/tez/runtime/TezThreadDumpHelper.java | 2 +- .../src/main/java/org/apache/tez/runtime/task/TezChild.java | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java index 668eb2745c..57a3db03ee 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -40,7 +40,7 @@ import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL; import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_DEFAULT; -public class TezThreadDumpHelper { +public final class TezThreadDumpHelper { private final long duration; private final Path basePath; diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 29b5708c36..609314a5be 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -33,7 +33,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -41,7 +40,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; From 49687d4ce6b91b5e28afbb23043d221d0559528d Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Wed, 5 Jul 2023 18:00:12 +0530 Subject: [PATCH 05/12] Review Comments.: Change-Id: Ie16c5b180d4b19fef43144f48c279e7382965476 --- .../org/apache/tez/dag/app/DAGAppMaster.java | 24 +++++++------------ .../tez/runtime/TezThreadDumpHelper.java | 16 +++++++++++-- .../org/apache/tez/runtime/task/TezChild.java | 14 ++++------- .../java/org/apache/tez/test/TestTezJobs.java | 6 ++--- 4 files changed, 30 insertions(+), 30 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index c8bf827dc9..c8f31b5307 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -200,8 +200,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import static org.apache.tez.runtime.TezThreadDumpHelper.getTezThreadDumpHelper; - /** * The Tez DAG Application Master. * The state machine is encapsulated in the implementation of Job interface. @@ -770,10 +768,8 @@ protected synchronized void handle(DAGAppMasterEvent event) { "DAGAppMaster Internal Error occurred"); break; case DAG_FINISHED: - if (tezThreadDumpHelper != null) { - tezThreadDumpHelper.shutdownPeriodicThreadDumpService(); - tezThreadDumpHelper = null; - } + TezThreadDumpHelper.shutdownPeriodicThreadDumpService(tezThreadDumpHelper); + tezThreadDumpHelper = null; DAGAppMasterEventDAGFinished finishEvt = (DAGAppMasterEventDAGFinished) event; String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); @@ -960,12 +956,6 @@ public void shutdown(boolean now) { } LOG.info("Handling DAGAppMaster shutdown"); - // Check if the thread dump service is up in any case, if yes attempt a shutdown - if (tezThreadDumpHelper != null) { - tezThreadDumpHelper.shutdownPeriodicThreadDumpService(); - tezThreadDumpHelper = null; - } - AMShutdownRunnable r = new AMShutdownRunnable(now, sleepTimeBeforeExit); Thread t = new Thread(r, "AMShutdownThread"); t.start(); @@ -2215,6 +2205,10 @@ public Void run() throws Exception { execService.shutdownNow(); } + // Check if the thread dump service is up in any case, if yes attempt a shutdown + TezThreadDumpHelper.shutdownPeriodicThreadDumpService(tezThreadDumpHelper); + tezThreadDumpHelper = null; + super.serviceStop(); } @@ -2592,10 +2586,8 @@ private void startDAGExecution(DAG dag, final Map additio throws TezException { currentDAG = dag; - tezThreadDumpHelper = getTezThreadDumpHelper(dag.getConf()); - if (tezThreadDumpHelper != null) { - tezThreadDumpHelper.schedulePeriodicThreadDumpService(dag.getName() + "_AppMaster"); - } + tezThreadDumpHelper = TezThreadDumpHelper.getTezThreadDumpHelper(dag.getConf()); + TezThreadDumpHelper.schedulePeriodicThreadDumpService(tezThreadDumpHelper, dag.getName() + "_AppMaster"); // Try localizing the actual resources. List additionalUrlsForClasspath; diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java index 57a3db03ee..f688509527 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -75,7 +75,13 @@ public static TezThreadDumpHelper getTezThreadDumpHelper(Configuration conf) { return null; } - public void schedulePeriodicThreadDumpService(String dagName) { + public static void schedulePeriodicThreadDumpService(TezThreadDumpHelper threadDumpHelper, String dagName) { + if (threadDumpHelper != null) { + threadDumpHelper.schedulePeriodicThreadDumpService(dagName); + } + } + + private void schedulePeriodicThreadDumpService(String dagName) { periodicThreadDumpServiceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true). setNameFormat("PeriodicThreadDumpService{" + dagName + "} #%d").build()); @@ -83,7 +89,13 @@ public void schedulePeriodicThreadDumpService(String dagName) { periodicThreadDumpServiceExecutor.schedule(threadDumpCollector, duration, TimeUnit.MILLISECONDS); } - public void shutdownPeriodicThreadDumpService() { + public static void shutdownPeriodicThreadDumpService(TezThreadDumpHelper threadDumpHelper) { + if (threadDumpHelper != null) { + threadDumpHelper.shutdownPeriodicThreadDumpService(); + } + } + + private void shutdownPeriodicThreadDumpService() { if (periodicThreadDumpServiceExecutor != null) { periodicThreadDumpServiceExecutor.shutdown(); diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 609314a5be..2b107574ef 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -92,8 +92,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import static org.apache.tez.runtime.TezThreadDumpHelper.getTezThreadDumpHelper; - public class TezChild { private static final Logger LOG = LoggerFactory.getLogger(TezChild.class); @@ -212,7 +210,7 @@ public TezTaskUmbilicalProtocol run() throws Exception { } TezCommonUtils.logCredentials(LOG, credentials, "tezChildInit"); - tezThreadDumpHelper = getTezThreadDumpHelper(conf); + tezThreadDumpHelper = TezThreadDumpHelper.getTezThreadDumpHelper(conf); } public ContainerExecutionResult run() throws IOException, InterruptedException, TezException { @@ -235,9 +233,8 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, ContainerTask containerTask = null; try { containerTask = getTaskFuture.get(); - if (tezThreadDumpHelper != null) { - tezThreadDumpHelper.schedulePeriodicThreadDumpService(containerTask.getTaskSpec().getDAGName()); - } + TezThreadDumpHelper.schedulePeriodicThreadDumpService(tezThreadDumpHelper, + containerTask.getTaskSpec().getDAGName()); } catch (ExecutionException e) { error = true; Throwable cause = e.getCause(); @@ -435,9 +432,8 @@ public void shutdown() { } } - if (tezThreadDumpHelper != null) { - tezThreadDumpHelper.shutdownPeriodicThreadDumpService(); - } + TezThreadDumpHelper.shutdownPeriodicThreadDumpService(tezThreadDumpHelper); + tezThreadDumpHelper = null; TezRuntimeShutdownHandler.shutdown(); LOG.info("TezChild shutdown finished"); } diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index 54cf99be13..586f950167 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -546,10 +546,10 @@ public void testSortMergeJoinExampleWithThreadDump() throws Exception { testSortMergeJoinExampleDisableSplitGrouping(true); } - public void testSortMergeJoinExampleDisableSplitGrouping(boolean withThreadDum) throws Exception { + public void testSortMergeJoinExampleDisableSplitGrouping(boolean withThreadDump) throws Exception { SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample(); Configuration newConf = new Configuration(mrrTezCluster.getConfig()); - if (withThreadDum) { + if (withThreadDump) { newConf.set(TEZ_THREAD_DUMP_INTERVAL, "2ms"); } sortMergeJoinExample.setConf(newConf); @@ -606,7 +606,7 @@ public boolean accept(Path p) { inStream.close(); assertEquals(0, expectedResult.size()); - if (withThreadDum) { + if (withThreadDump) { validateThreadDumpCaptured(); } } From 24d50f11496f21e60607751125d35e1a712b85dd Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Thu, 6 Jul 2023 00:54:55 +0530 Subject: [PATCH 06/12] Address Review Comments. Change-Id: I2be6bf61b3925f61a68891d722a830b988f77263 --- .../org/apache/tez/dag/app/DAGAppMaster.java | 2 +- .../tez/runtime/TezThreadDumpHelper.java | 24 ++++++++++++------- .../org/apache/tez/runtime/task/TezChild.java | 2 +- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index c8f31b5307..449ee4c94c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -2587,7 +2587,7 @@ private void startDAGExecution(DAG dag, final Map additio currentDAG = dag; tezThreadDumpHelper = TezThreadDumpHelper.getTezThreadDumpHelper(dag.getConf()); - TezThreadDumpHelper.schedulePeriodicThreadDumpService(tezThreadDumpHelper, dag.getName() + "_AppMaster"); + TezThreadDumpHelper.schedulePeriodicThreadDumpService(tezThreadDumpHelper, dag.getID() + "_AppMaster"); // Try localizing the actual resources. List additionalUrlsForClasspath; diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java index f688509527..a7cd5af776 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -23,6 +23,9 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.Appender; +import org.apache.tez.common.TezContainerLogAppender; +import org.apache.tez.dag.api.TezConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,10 +54,19 @@ public final class TezThreadDumpHelper { private ScheduledExecutorService periodicThreadDumpServiceExecutor; - private TezThreadDumpHelper(long duration, Path basePath, Configuration conf) throws IOException { + private TezThreadDumpHelper(long duration, Configuration conf) throws IOException { this.duration = duration; - this.basePath = basePath; - this.fs = basePath.getFileSystem(conf); + Appender appender = org.apache.log4j.Logger.getRootLogger().getAppender(TezConstants.TEZ_CONTAINER_LOGGER_NAME); + if (appender instanceof TezContainerLogAppender) { + this.basePath = new Path(((TezContainerLogAppender) appender).getContainerLogDir()); + this.fs = FileSystem.getLocal(conf); + } else { + // Fallback, if it is any other appender or if none is configured. + this.basePath = new Path(conf.get(NM_REMOTE_APP_LOG_DIR, DEFAULT_NM_REMOTE_APP_LOG_DIR)); + this.fs = this.basePath.getFileSystem(conf); + } + LOG.info("Periodic Thread Dump Capture Service Configured to capture Thread Dumps at {} ms frequency and at " + + "path: {}", duration, basePath); } public static TezThreadDumpHelper getTezThreadDumpHelper(Configuration conf) { @@ -62,15 +74,11 @@ public static TezThreadDumpHelper getTezThreadDumpHelper(Configuration conf) { conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); if (periodicThreadDumpFrequency > 0) { - Path basePath = new Path(conf.get(NM_REMOTE_APP_LOG_DIR, DEFAULT_NM_REMOTE_APP_LOG_DIR)); - LOG.info("Periodic Thread Dump Capture Service Configured to capture Thread Dumps at {} ms frequency and at " + - "path: {}", periodicThreadDumpFrequency, basePath); try { - return new TezThreadDumpHelper(periodicThreadDumpFrequency, basePath, conf); + return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf); } catch (IOException e) { LOG.warn("Can not initialize periodic thread dump service", e); } - } return null; } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 2b107574ef..9e7186076e 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -234,7 +234,7 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, try { containerTask = getTaskFuture.get(); TezThreadDumpHelper.schedulePeriodicThreadDumpService(tezThreadDumpHelper, - containerTask.getTaskSpec().getDAGName()); + containerTask.getTaskSpec().getDAGID().toString()); } catch (ExecutionException e) { error = true; Throwable cause = e.getCause(); From dba9265db5efe68eac4e15158164e7ad428f7b26 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Thu, 6 Jul 2023 02:09:06 +0530 Subject: [PATCH 07/12] Improve Test & Names. Change-Id: I9d88c928f6591f5590f4bbe601364b8eddf5d8b4 --- .../org/apache/tez/dag/app/DAGAppMaster.java | 2 +- .../tez/runtime/TezThreadDumpHelper.java | 20 +++++++------- .../org/apache/tez/runtime/task/TezChild.java | 2 +- .../java/org/apache/tez/test/TestTezJobs.java | 27 ++++++++++--------- 4 files changed, 27 insertions(+), 24 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 449ee4c94c..558ed1cc07 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -2587,7 +2587,7 @@ private void startDAGExecution(DAG dag, final Map additio currentDAG = dag; tezThreadDumpHelper = TezThreadDumpHelper.getTezThreadDumpHelper(dag.getConf()); - TezThreadDumpHelper.schedulePeriodicThreadDumpService(tezThreadDumpHelper, dag.getID() + "_AppMaster"); + TezThreadDumpHelper.schedulePeriodicThreadDumpService(tezThreadDumpHelper, dag.getID().toString()); // Try localizing the actual resources. List additionalUrlsForClasspath; diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java index a7cd5af776..6e591a6a8b 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -83,17 +83,17 @@ public static TezThreadDumpHelper getTezThreadDumpHelper(Configuration conf) { return null; } - public static void schedulePeriodicThreadDumpService(TezThreadDumpHelper threadDumpHelper, String dagName) { + public static void schedulePeriodicThreadDumpService(TezThreadDumpHelper threadDumpHelper, String name) { if (threadDumpHelper != null) { - threadDumpHelper.schedulePeriodicThreadDumpService(dagName); + threadDumpHelper.schedulePeriodicThreadDumpService(name); } } - private void schedulePeriodicThreadDumpService(String dagName) { + private void schedulePeriodicThreadDumpService(String name) { periodicThreadDumpServiceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true). - setNameFormat("PeriodicThreadDumpService{" + dagName + "} #%d").build()); - Runnable threadDumpCollector = new ThreadDumpCollector(basePath, dagName, fs); + setNameFormat("PeriodicThreadDumpService{" + name + "} #%d").build()); + Runnable threadDumpCollector = new ThreadDumpCollector(basePath, name, fs); periodicThreadDumpServiceExecutor.schedule(threadDumpCollector, duration, TimeUnit.MILLISECONDS); } @@ -122,22 +122,22 @@ private void shutdownPeriodicThreadDumpService() { private static class ThreadDumpCollector implements Runnable { private final Path path; - private final String dagName; + private final String name; private final FileSystem fs; - ThreadDumpCollector(Path path, String dagName, FileSystem fs) { + ThreadDumpCollector(Path path, String name, FileSystem fs) { this.path = path; this.fs = fs; - this.dagName = dagName; + this.name = name; } @Override public void run() { if (!Thread.interrupted()) { try (FSDataOutputStream fsStream = fs.create( - new Path(path, dagName + "_" + System.currentTimeMillis() + ".jstack")); + new Path(path, name + "_" + System.currentTimeMillis() + ".jstack")); PrintStream printStream = new PrintStream(fsStream, false, "UTF8")) { - printThreadInfo(printStream, dagName); + printThreadInfo(printStream, name); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 9e7186076e..800e4226f2 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -234,7 +234,7 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, try { containerTask = getTaskFuture.get(); TezThreadDumpHelper.schedulePeriodicThreadDumpService(tezThreadDumpHelper, - containerTask.getTaskSpec().getDAGID().toString()); + containerTask.getTaskSpec().getTaskAttemptID().toString()); } catch (ExecutionException e) { error = true; Throwable cause = e.getCause(); diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index 586f950167..5f7658a22a 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -18,9 +18,8 @@ package org.apache.tez.test; -import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR; -import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_REMOTE_APP_LOG_DIR; import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL; +import static org.apache.tez.dag.api.TezConstants.TEZ_CONTAINER_LOGGER_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -52,6 +51,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.tez.common.TezContainerLogAppender; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; @@ -549,8 +549,13 @@ public void testSortMergeJoinExampleWithThreadDump() throws Exception { public void testSortMergeJoinExampleDisableSplitGrouping(boolean withThreadDump) throws Exception { SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample(); Configuration newConf = new Configuration(mrrTezCluster.getConfig()); + Path logPath = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/logPath"); if (withThreadDump) { - newConf.set(TEZ_THREAD_DUMP_INTERVAL, "2ms"); + TezContainerLogAppender appender = new TezContainerLogAppender(); + org.apache.log4j.Logger.getRootLogger().addAppender(appender); + appender.setName(TEZ_CONTAINER_LOGGER_NAME); + appender.setContainerLogDir(logPath.toString()); + newConf.set(TEZ_THREAD_DUMP_INTERVAL, "1ms"); } sortMergeJoinExample.setConf(newConf); Path stagingDirPath = new Path(TEST_ROOT_DIR + "/tmp/tez-staging-dir"); @@ -607,28 +612,26 @@ public boolean accept(Path p) { assertEquals(0, expectedResult.size()); if (withThreadDump) { - validateThreadDumpCaptured(); + validateThreadDumpCaptured(logPath); } } - private static void validateThreadDumpCaptured() throws IOException { - Path jstackPath = new Path(conf.get(NM_REMOTE_APP_LOG_DIR, DEFAULT_NM_REMOTE_APP_LOG_DIR)); + private static void validateThreadDumpCaptured(Path jstackPath) throws IOException { RemoteIterator files = localFs.listFiles(jstackPath, true); - boolean threadDumpFound = false; boolean appMasterDumpFound = false; + boolean tezChildDumpFound = false; while (files.hasNext()) { LocatedFileStatus file = files.next(); if (file.getPath().getName().endsWith(".jstack")) { - if(file.getPath().getName().contains("AppMaster")) { - appMasterDumpFound = true; + if (file.getPath().getName().contains("attempt")) { + tezChildDumpFound = true; } else { - threadDumpFound = true; + appMasterDumpFound = true; } } } - assertTrue(threadDumpFound); + assertTrue(tezChildDumpFound); assertTrue(appMasterDumpFound); - localFs.delete(jstackPath, true); } /** From 820b2d2ef7a61974e42df13788e2ae2af9194f9d Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Thu, 6 Jul 2023 09:38:48 +0530 Subject: [PATCH 08/12] Fix Checkstyle. Change-Id: I33993adb8ebd811dae1b3e3eed3d75641fae2956 --- .../java/org/apache/tez/runtime/TezThreadDumpHelper.java | 6 +++--- .../src/test/java/org/apache/tez/test/TestTezJobs.java | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java index 6e591a6a8b..5c1dac05ad 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -178,11 +178,11 @@ public synchronized void printThreadInfo(PrintStream stream, String title) { stream.flush(); } - private String getTaskName(long id, String name) { - if (name == null) { + private String getTaskName(long id, String taskName) { + if (taskName == null) { return Long.toString(id); } - return id + " (" + name + ")"; + return id + " (" + taskName + ")"; } } } diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index 5f7658a22a..892629f29e 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -613,6 +613,7 @@ public boolean accept(Path p) { if (withThreadDump) { validateThreadDumpCaptured(logPath); + org.apache.log4j.Logger.getRootLogger().removeAppender(TEZ_CONTAINER_LOGGER_NAME); } } From 1d7f438cc8532f7eb9fa6e6c3727a44d74b33f0f Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Thu, 6 Jul 2023 13:39:11 +0530 Subject: [PATCH 09/12] Rename methods. Change-Id: I4ab43d923511668c8e51e8a571bda153b6ccf102 --- .../java/org/apache/tez/dag/app/DAGAppMaster.java | 10 ++++------ .../apache/tez/runtime/TezThreadDumpHelper.java | 14 +++++++------- .../java/org/apache/tez/runtime/task/TezChild.java | 7 +++---- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 558ed1cc07..3ddd8438da 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -768,8 +768,7 @@ protected synchronized void handle(DAGAppMasterEvent event) { "DAGAppMaster Internal Error occurred"); break; case DAG_FINISHED: - TezThreadDumpHelper.shutdownPeriodicThreadDumpService(tezThreadDumpHelper); - tezThreadDumpHelper = null; + TezThreadDumpHelper.stopPeriodicThreadDumpService(tezThreadDumpHelper); DAGAppMasterEventDAGFinished finishEvt = (DAGAppMasterEventDAGFinished) event; String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); @@ -2206,8 +2205,7 @@ public Void run() throws Exception { } // Check if the thread dump service is up in any case, if yes attempt a shutdown - TezThreadDumpHelper.shutdownPeriodicThreadDumpService(tezThreadDumpHelper); - tezThreadDumpHelper = null; + TezThreadDumpHelper.stopPeriodicThreadDumpService(tezThreadDumpHelper); super.serviceStop(); } @@ -2586,8 +2584,8 @@ private void startDAGExecution(DAG dag, final Map additio throws TezException { currentDAG = dag; - tezThreadDumpHelper = TezThreadDumpHelper.getTezThreadDumpHelper(dag.getConf()); - TezThreadDumpHelper.schedulePeriodicThreadDumpService(tezThreadDumpHelper, dag.getID().toString()); + tezThreadDumpHelper = TezThreadDumpHelper.getInstance(dag.getConf()); + TezThreadDumpHelper.startPeriodicThreadDumpService(tezThreadDumpHelper, dag.getID().toString()); // Try localizing the actual resources. List additionalUrlsForClasspath; diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java index 5c1dac05ad..9649f68d7f 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -69,7 +69,7 @@ private TezThreadDumpHelper(long duration, Configuration conf) throws IOExceptio "path: {}", duration, basePath); } - public static TezThreadDumpHelper getTezThreadDumpHelper(Configuration conf) { + public static TezThreadDumpHelper getInstance(Configuration conf) { long periodicThreadDumpFrequency = conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); @@ -83,13 +83,13 @@ public static TezThreadDumpHelper getTezThreadDumpHelper(Configuration conf) { return null; } - public static void schedulePeriodicThreadDumpService(TezThreadDumpHelper threadDumpHelper, String name) { + public static void startPeriodicThreadDumpService(TezThreadDumpHelper threadDumpHelper, String name) { if (threadDumpHelper != null) { - threadDumpHelper.schedulePeriodicThreadDumpService(name); + threadDumpHelper.startPeriodicThreadDumpService(name); } } - private void schedulePeriodicThreadDumpService(String name) { + private void startPeriodicThreadDumpService(String name) { periodicThreadDumpServiceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true). setNameFormat("PeriodicThreadDumpService{" + name + "} #%d").build()); @@ -97,13 +97,13 @@ private void schedulePeriodicThreadDumpService(String name) { periodicThreadDumpServiceExecutor.schedule(threadDumpCollector, duration, TimeUnit.MILLISECONDS); } - public static void shutdownPeriodicThreadDumpService(TezThreadDumpHelper threadDumpHelper) { + public static void stopPeriodicThreadDumpService(TezThreadDumpHelper threadDumpHelper) { if (threadDumpHelper != null) { - threadDumpHelper.shutdownPeriodicThreadDumpService(); + threadDumpHelper.stopPeriodicThreadDumpService(); } } - private void shutdownPeriodicThreadDumpService() { + private void stopPeriodicThreadDumpService() { if (periodicThreadDumpServiceExecutor != null) { periodicThreadDumpServiceExecutor.shutdown(); diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 800e4226f2..4a5235e8ec 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -210,7 +210,7 @@ public TezTaskUmbilicalProtocol run() throws Exception { } TezCommonUtils.logCredentials(LOG, credentials, "tezChildInit"); - tezThreadDumpHelper = TezThreadDumpHelper.getTezThreadDumpHelper(conf); + tezThreadDumpHelper = TezThreadDumpHelper.getInstance(conf); } public ContainerExecutionResult run() throws IOException, InterruptedException, TezException { @@ -233,7 +233,7 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, ContainerTask containerTask = null; try { containerTask = getTaskFuture.get(); - TezThreadDumpHelper.schedulePeriodicThreadDumpService(tezThreadDumpHelper, + TezThreadDumpHelper.startPeriodicThreadDumpService(tezThreadDumpHelper, containerTask.getTaskSpec().getTaskAttemptID().toString()); } catch (ExecutionException e) { error = true; @@ -432,8 +432,7 @@ public void shutdown() { } } - TezThreadDumpHelper.shutdownPeriodicThreadDumpService(tezThreadDumpHelper); - tezThreadDumpHelper = null; + TezThreadDumpHelper.stopPeriodicThreadDumpService(tezThreadDumpHelper); TezRuntimeShutdownHandler.shutdown(); LOG.info("TezChild shutdown finished"); } From 0aa473870c44320e466dcfa68c38eeaa03da5fa4 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Thu, 6 Jul 2023 13:59:16 +0530 Subject: [PATCH 10/12] Refactor to use task conf. Change-Id: If88667b9e978b648c8d3e85535feeb24201d9879 --- .../apache/tez/dag/api/TezConfiguration.java | 2 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 8 ++- .../tez/runtime/TezThreadDumpHelper.java | 54 ++++++++++--------- .../org/apache/tez/runtime/task/TezChild.java | 8 ++- 4 files changed, 37 insertions(+), 35 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index d0addd4f8c..9e2e2d89cf 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -2302,6 +2302,6 @@ static Set getPropertySet() { @ConfigurationScope(Scope.DAG) @ConfigurationProperty public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval"; - public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0"; + public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms"; } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 3ddd8438da..0e4a1df537 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -768,7 +768,7 @@ protected synchronized void handle(DAGAppMasterEvent event) { "DAGAppMaster Internal Error occurred"); break; case DAG_FINISHED: - TezThreadDumpHelper.stopPeriodicThreadDumpService(tezThreadDumpHelper); + tezThreadDumpHelper.stop(); DAGAppMasterEventDAGFinished finishEvt = (DAGAppMasterEventDAGFinished) event; String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); @@ -2205,7 +2205,7 @@ public Void run() throws Exception { } // Check if the thread dump service is up in any case, if yes attempt a shutdown - TezThreadDumpHelper.stopPeriodicThreadDumpService(tezThreadDumpHelper); + tezThreadDumpHelper.stop(); super.serviceStop(); } @@ -2583,9 +2583,7 @@ void stopVertexServices(DAG dag) { private void startDAGExecution(DAG dag, final Map additionalAmResources) throws TezException { currentDAG = dag; - - tezThreadDumpHelper = TezThreadDumpHelper.getInstance(dag.getConf()); - TezThreadDumpHelper.startPeriodicThreadDumpService(tezThreadDumpHelper, dag.getID().toString()); + tezThreadDumpHelper = TezThreadDumpHelper.getInstance(dag.getConf()).start(dag.getID().toString()); // Try localizing the actual resources. List additionalUrlsForClasspath; diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java index 9649f68d7f..8934dbd961 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -43,11 +43,11 @@ import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL; import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_DEFAULT; -public final class TezThreadDumpHelper { +public class TezThreadDumpHelper { - private final long duration; - private final Path basePath; - private final FileSystem fs; + private long duration = 0L; + private Path basePath = null; + private FileSystem fs = null; private static final ThreadMXBean THREAD_BEAN = ManagementFactory.getThreadMXBean(); private static final Logger LOG = LoggerFactory.getLogger(TezThreadDumpHelper.class); @@ -69,6 +69,9 @@ private TezThreadDumpHelper(long duration, Configuration conf) throws IOExceptio "path: {}", duration, basePath); } + public TezThreadDumpHelper() { + } + public static TezThreadDumpHelper getInstance(Configuration conf) { long periodicThreadDumpFrequency = conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); @@ -80,30 +83,19 @@ public static TezThreadDumpHelper getInstance(Configuration conf) { LOG.warn("Can not initialize periodic thread dump service", e); } } - return null; - } - - public static void startPeriodicThreadDumpService(TezThreadDumpHelper threadDumpHelper, String name) { - if (threadDumpHelper != null) { - threadDumpHelper.startPeriodicThreadDumpService(name); - } + return new NoopTezThreadDumpHelper(); } - private void startPeriodicThreadDumpService(String name) { + public TezThreadDumpHelper start(String name) { periodicThreadDumpServiceExecutor = Executors.newScheduledThreadPool(1, - new ThreadFactoryBuilder().setDaemon(true). - setNameFormat("PeriodicThreadDumpService{" + name + "} #%d").build()); + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("PeriodicThreadDumpService{" + name + "} #%d") + .build()); Runnable threadDumpCollector = new ThreadDumpCollector(basePath, name, fs); periodicThreadDumpServiceExecutor.schedule(threadDumpCollector, duration, TimeUnit.MILLISECONDS); + return this; } - public static void stopPeriodicThreadDumpService(TezThreadDumpHelper threadDumpHelper) { - if (threadDumpHelper != null) { - threadDumpHelper.stopPeriodicThreadDumpService(); - } - } - - private void stopPeriodicThreadDumpService() { + public void stop() { if (periodicThreadDumpServiceExecutor != null) { periodicThreadDumpServiceExecutor.shutdown(); @@ -134,9 +126,9 @@ private static class ThreadDumpCollector implements Runnable { @Override public void run() { if (!Thread.interrupted()) { - try (FSDataOutputStream fsStream = fs.create( - new Path(path, name + "_" + System.currentTimeMillis() + ".jstack")); - PrintStream printStream = new PrintStream(fsStream, false, "UTF8")) { + try (FSDataOutputStream fsStream = fs.create(new Path(path, + name + "_" + System.currentTimeMillis() + ".jstack")); PrintStream printStream = new PrintStream(fsStream, + false, "UTF8")) { printThreadInfo(printStream, name); } catch (IOException e) { throw new RuntimeException(e); @@ -185,4 +177,18 @@ private String getTaskName(long id, String taskName) { return id + " (" + taskName + ")"; } } + + private static class NoopTezThreadDumpHelper extends TezThreadDumpHelper { + + @Override + public TezThreadDumpHelper start(String name) { + // Do Nothing + return this; + } + + @Override + public void stop() { + // Do Nothing + } + } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 4a5235e8ec..8fb6f2afe4 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -209,8 +209,6 @@ public TezTaskUmbilicalProtocol run() throws Exception { ownUmbilical = false; } TezCommonUtils.logCredentials(LOG, credentials, "tezChildInit"); - - tezThreadDumpHelper = TezThreadDumpHelper.getInstance(conf); } public ContainerExecutionResult run() throws IOException, InterruptedException, TezException { @@ -233,8 +231,6 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, ContainerTask containerTask = null; try { containerTask = getTaskFuture.get(); - TezThreadDumpHelper.startPeriodicThreadDumpService(tezThreadDumpHelper, - containerTask.getTaskSpec().getTaskAttemptID().toString()); } catch (ExecutionException e) { error = true; Throwable cause = e.getCause(); @@ -298,6 +294,8 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, hadoopShim, sharedExecutor); boolean shouldDie; + tezThreadDumpHelper = + TezThreadDumpHelper.getInstance(containerTask.getTaskSpec().getTaskConf()).start(attemptId.toString()); try { TaskRunner2Result result = taskRunner.run(); LOG.info("TaskRunner2Result: {}", result); @@ -316,6 +314,7 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, e, "TaskExecutionFailure: " + e.getMessage()); } } finally { + tezThreadDumpHelper.stop(); FileSystem.closeAllForUGI(childUGI); } } @@ -432,7 +431,6 @@ public void shutdown() { } } - TezThreadDumpHelper.stopPeriodicThreadDumpService(tezThreadDumpHelper); TezRuntimeShutdownHandler.shutdown(); LOG.info("TezChild shutdown finished"); } From 34d38b86dcdee046ba093342cd0f4348d52450f6 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Thu, 6 Jul 2023 22:25:43 +0530 Subject: [PATCH 11/12] Fix Test. Change-Id: I0fdeea8971edce4ff2886c757a05306438b96893 --- .../main/java/org/apache/tez/dag/app/DAGAppMaster.java | 2 +- .../java/org/apache/tez/runtime/TezThreadDumpHelper.java | 3 ++- .../main/java/org/apache/tez/runtime/task/TezChild.java | 9 +++++---- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 0e4a1df537..16d0a4dbd4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -341,7 +341,7 @@ public class DAGAppMaster extends AbstractService { Map services = new LinkedHashMap(); private ThreadLocalMap mdcContext; - private TezThreadDumpHelper tezThreadDumpHelper = null; + private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER; public DAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java index 8934dbd961..25d8609fc3 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -45,6 +45,7 @@ public class TezThreadDumpHelper { + public static final NoopTezThreadDumpHelper NOOP_TEZ_THREAD_DUMP_HELPER = new NoopTezThreadDumpHelper(); private long duration = 0L; private Path basePath = null; private FileSystem fs = null; @@ -83,7 +84,7 @@ public static TezThreadDumpHelper getInstance(Configuration conf) { LOG.warn("Can not initialize periodic thread dump service", e); } } - return new NoopTezThreadDumpHelper(); + return NOOP_TEZ_THREAD_DUMP_HELPER; } public TezThreadDumpHelper start(String name) { diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 8fb6f2afe4..3145f21a58 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -120,7 +120,7 @@ public class TezChild { private final AtomicBoolean isShutdown = new AtomicBoolean(false); private final String user; private final boolean updateSysCounters; - private TezThreadDumpHelper tezThreadDumpHelper = null; + private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER; private Multimap startedInputsMap = HashMultimap.create(); private final boolean ownUmbilical; @@ -250,13 +250,15 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, } TezTaskAttemptID attemptId = containerTask.getTaskSpec().getTaskAttemptID(); + Configuration taskConf; if (containerTask.getTaskSpec().getTaskConf() != null) { Configuration copy = new Configuration(defaultConf); TezTaskRunner2.mergeTaskSpecConfToConf(containerTask.getTaskSpec(), copy); - + taskConf = copy; LoggingUtils.initLoggingContext(mdcContext, copy, attemptId.getTaskID().getVertexID().getDAGID().toString(), attemptId.toString()); } else { + taskConf = defaultConf; LoggingUtils.initLoggingContext(mdcContext, defaultConf, attemptId.getTaskID().getVertexID().getDAGID().toString(), attemptId.toString()); } @@ -294,8 +296,7 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, hadoopShim, sharedExecutor); boolean shouldDie; - tezThreadDumpHelper = - TezThreadDumpHelper.getInstance(containerTask.getTaskSpec().getTaskConf()).start(attemptId.toString()); + tezThreadDumpHelper = TezThreadDumpHelper.getInstance(taskConf).start(attemptId.toString()); try { TaskRunner2Result result = taskRunner.run(); LOG.info("TaskRunner2Result: {}", result); From d805772d83378a0d2f3664982f026586f276ba27 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Mon, 10 Jul 2023 13:31:53 +0530 Subject: [PATCH 12/12] Checkstyle Fix. Change-Id: I4bf6b2c82f6d1455f5f816a07dbcfcd80b603652 --- .../java/org/apache/tez/runtime/TezThreadDumpHelper.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java index 25d8609fc3..6f3e9fec1e 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -127,9 +127,9 @@ private static class ThreadDumpCollector implements Runnable { @Override public void run() { if (!Thread.interrupted()) { - try (FSDataOutputStream fsStream = fs.create(new Path(path, - name + "_" + System.currentTimeMillis() + ".jstack")); PrintStream printStream = new PrintStream(fsStream, - false, "UTF8")) { + try (FSDataOutputStream fsStream = fs.create( + new Path(path, name + "_" + System.currentTimeMillis() + ".jstack")); + PrintStream printStream = new PrintStream(fsStream, false, "UTF8")) { printThreadInfo(printStream, name); } catch (IOException e) { throw new RuntimeException(e);