Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -2296,4 +2296,12 @@ static Set<String> getPropertySet() {
@ConfigurationProperty
public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES = "tez.mrreader.config.update.properties";

/**
* Frequency at which thread dump should be captured. Supports TimeUnits.
*/
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty
public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval";
public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms";

}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@
import org.apache.tez.dag.utils.Simple2LevelVersionComparator;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.util.LoggingUtils;
import org.apache.tez.util.TezMxBeanResourceCalculator;
import org.codehaus.jettison.json.JSONException;
Expand Down Expand Up @@ -340,6 +341,7 @@ public class DAGAppMaster extends AbstractService {
Map<Service, ServiceWithDependency> services =
new LinkedHashMap<Service, ServiceWithDependency>();
private ThreadLocalMap mdcContext;
private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER;

public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
Expand Down Expand Up @@ -766,6 +768,7 @@ protected synchronized void handle(DAGAppMasterEvent event) {
"DAGAppMaster Internal Error occurred");
break;
case DAG_FINISHED:
tezThreadDumpHelper.stop();
DAGAppMasterEventDAGFinished finishEvt =
(DAGAppMasterEventDAGFinished) event;
String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
Expand Down Expand Up @@ -2201,6 +2204,9 @@ public Void run() throws Exception {
execService.shutdownNow();
}

// Check if the thread dump service is up in any case, if yes attempt a shutdown
tezThreadDumpHelper.stop();

super.serviceStop();
}

Expand Down Expand Up @@ -2577,6 +2583,8 @@ void stopVertexServices(DAG dag) {
private void startDAGExecution(DAG dag, final Map<String, LocalResource> additionalAmResources)
throws TezException {
currentDAG = dag;
tezThreadDumpHelper = TezThreadDumpHelper.getInstance(dag.getConf()).start(dag.getID().toString());

// Try localizing the actual resources.
List<URL> additionalUrlsForClasspath;
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.runtime;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Appender;
import org.apache.tez.common.TezContainerLogAppender;
import org.apache.tez.dag.api.TezConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.PrintStream;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_REMOTE_APP_LOG_DIR;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_DEFAULT;

public class TezThreadDumpHelper {

public static final NoopTezThreadDumpHelper NOOP_TEZ_THREAD_DUMP_HELPER = new NoopTezThreadDumpHelper();
private long duration = 0L;
private Path basePath = null;
private FileSystem fs = null;

private static final ThreadMXBean THREAD_BEAN = ManagementFactory.getThreadMXBean();
private static final Logger LOG = LoggerFactory.getLogger(TezThreadDumpHelper.class);

private ScheduledExecutorService periodicThreadDumpServiceExecutor;

private TezThreadDumpHelper(long duration, Configuration conf) throws IOException {
this.duration = duration;
Appender appender = org.apache.log4j.Logger.getRootLogger().getAppender(TezConstants.TEZ_CONTAINER_LOGGER_NAME);
if (appender instanceof TezContainerLogAppender) {
this.basePath = new Path(((TezContainerLogAppender) appender).getContainerLogDir());
this.fs = FileSystem.getLocal(conf);
} else {
// Fallback, if it is any other appender or if none is configured.
this.basePath = new Path(conf.get(NM_REMOTE_APP_LOG_DIR, DEFAULT_NM_REMOTE_APP_LOG_DIR));
this.fs = this.basePath.getFileSystem(conf);
}
LOG.info("Periodic Thread Dump Capture Service Configured to capture Thread Dumps at {} ms frequency and at " +
"path: {}", duration, basePath);
}

public TezThreadDumpHelper() {
}

public static TezThreadDumpHelper getInstance(Configuration conf) {
long periodicThreadDumpFrequency =
conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);

if (periodicThreadDumpFrequency > 0) {
try {
return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf);
} catch (IOException e) {
LOG.warn("Can not initialize periodic thread dump service", e);
}
}
return NOOP_TEZ_THREAD_DUMP_HELPER;
}

public TezThreadDumpHelper start(String name) {
periodicThreadDumpServiceExecutor = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("PeriodicThreadDumpService{" + name + "} #%d")
.build());
Runnable threadDumpCollector = new ThreadDumpCollector(basePath, name, fs);
periodicThreadDumpServiceExecutor.schedule(threadDumpCollector, duration, TimeUnit.MILLISECONDS);
return this;
}

public void stop() {
if (periodicThreadDumpServiceExecutor != null) {
periodicThreadDumpServiceExecutor.shutdown();

try {
if (!periodicThreadDumpServiceExecutor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
periodicThreadDumpServiceExecutor.shutdownNow();
}
} catch (InterruptedException ignored) {
// Ignore interrupt, will attempt a final shutdown below.
}
periodicThreadDumpServiceExecutor.shutdownNow();
periodicThreadDumpServiceExecutor = null;
}
}

private static class ThreadDumpCollector implements Runnable {

private final Path path;
private final String name;
private final FileSystem fs;

ThreadDumpCollector(Path path, String name, FileSystem fs) {
this.path = path;
this.fs = fs;
this.name = name;
}

@Override
public void run() {
if (!Thread.interrupted()) {
try (FSDataOutputStream fsStream = fs.create(
new Path(path, name + "_" + System.currentTimeMillis() + ".jstack"));
PrintStream printStream = new PrintStream(fsStream, false, "UTF8")) {
printThreadInfo(printStream, name);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

public synchronized void printThreadInfo(PrintStream stream, String title) {
boolean contention = THREAD_BEAN.isThreadContentionMonitoringEnabled();
long[] threadIds = THREAD_BEAN.getAllThreadIds();
stream.println("Process Thread Dump: " + title);
stream.println(threadIds.length + " active threads");
for (long tid : threadIds) {
ThreadInfo info = THREAD_BEAN.getThreadInfo(tid, Integer.MAX_VALUE);
if (info == null) {
stream.println(" Inactive");
continue;
}
stream.println("Thread " + getTaskName(info.getThreadId(), info.getThreadName()) + ":");
Thread.State state = info.getThreadState();
stream.println(" State: " + state);
stream.println(" Blocked count: " + info.getBlockedCount());
stream.println(" Waited count: " + info.getWaitedCount());
if (contention) {
stream.println(" Blocked time: " + info.getBlockedTime());
stream.println(" Waited time: " + info.getWaitedTime());
}
if (state == Thread.State.WAITING) {
stream.println(" Waiting on " + info.getLockName());
} else if (state == Thread.State.BLOCKED) {
stream.println(" Blocked on " + info.getLockName());
stream.println(" Blocked by " + getTaskName(info.getLockOwnerId(), info.getLockOwnerName()));
}
stream.println(" Stack:");
for (StackTraceElement frame : info.getStackTrace()) {
stream.println(" " + frame.toString());
}
}
stream.flush();
}

private String getTaskName(long id, String taskName) {
if (taskName == null) {
return Long.toString(id);
}
return id + " (" + taskName + ")";
}
}

private static class NoopTezThreadDumpHelper extends TezThreadDumpHelper {

@Override
public TezThreadDumpHelper start(String name) {
// Do Nothing
return this;
}

@Override
public void stop() {
// Do Nothing
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
Expand Down Expand Up @@ -119,6 +120,7 @@ public class TezChild {
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final String user;
private final boolean updateSysCounters;
private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER;

private Multimap<String, String> startedInputsMap = HashMultimap.create();
private final boolean ownUmbilical;
Expand Down Expand Up @@ -178,7 +180,7 @@ public TezChild(Configuration conf, String host, int port, String containerIdent
if (LOG.isDebugEnabled()) {
LOG.debug("Executing with tokens:");
for (Token<?> token : credentials.getAllTokens()) {
LOG.debug("",token);
LOG.debug("{}", token);
}
}

Expand Down Expand Up @@ -248,13 +250,15 @@ public ContainerExecutionResult run() throws IOException, InterruptedException,
}

TezTaskAttemptID attemptId = containerTask.getTaskSpec().getTaskAttemptID();
Configuration taskConf;
if (containerTask.getTaskSpec().getTaskConf() != null) {
Configuration copy = new Configuration(defaultConf);
TezTaskRunner2.mergeTaskSpecConfToConf(containerTask.getTaskSpec(), copy);

taskConf = copy;
LoggingUtils.initLoggingContext(mdcContext, copy,
attemptId.getTaskID().getVertexID().getDAGID().toString(), attemptId.toString());
} else {
taskConf = defaultConf;
LoggingUtils.initLoggingContext(mdcContext, defaultConf,
attemptId.getTaskID().getVertexID().getDAGID().toString(), attemptId.toString());
}
Expand Down Expand Up @@ -292,6 +296,7 @@ public ContainerExecutionResult run() throws IOException, InterruptedException,
hadoopShim, sharedExecutor);

boolean shouldDie;
tezThreadDumpHelper = TezThreadDumpHelper.getInstance(taskConf).start(attemptId.toString());
try {
TaskRunner2Result result = taskRunner.run();
LOG.info("TaskRunner2Result: {}", result);
Expand All @@ -310,6 +315,7 @@ public ContainerExecutionResult run() throws IOException, InterruptedException,
e, "TaskExecutionFailure: " + e.getMessage());
}
} finally {
tezThreadDumpHelper.stop();
FileSystem.closeAllForUGI(childUGI);
}
}
Expand Down Expand Up @@ -425,6 +431,7 @@ public void shutdown() {
RPC.stopProxy(umbilical);
}
}

TezRuntimeShutdownHandler.shutdown();
LOG.info("TezChild shutdown finished");
}
Expand Down
Loading