getInitParameterNames() {
+ return null;
+ }
+
+ @Override
+ public String getInitParameter(String name) {
+ return null;
+ }
+ };
+ }
+
+ private void initServlet() {
+ try {
+ servlet.init(getDummyServletConfig(this.servlet.getClass().getSimpleName()));
+ } catch (ServletException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/tez-common/src/main/java/org/apache/tez/common/web/ProfileOutputServlet.java b/tez-common/src/main/java/org/apache/tez/common/web/ProfileOutputServlet.java
new file mode 100644
index 0000000000..2fac77cdc8
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/web/ProfileOutputServlet.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common.web;
+
+import org.apache.hadoop.yarn.webapp.MimeType;
+import org.eclipse.jetty.servlet.DefaultServlet;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+/**
+ * Servlet to serve files generated by {@link ProfileServlet}.
+ */
+public class ProfileOutputServlet extends DefaultServlet {
+ public static final String FILE_QUERY_PARAM = "file";
+
+ public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
+ String queriedFile = request.getParameter(FILE_QUERY_PARAM);
+ if (queriedFile == null) {
+ writeMessage(response, "Run the profiler to be able to receive its output");
+ return;
+ }
+ File outputFile = new File(ProfileServlet.OUTPUT_DIR, queriedFile);
+ if (!outputFile.exists()) {
+ writeMessage(response, "Requested file does not exist: " + queriedFile);
+ return;
+ }
+ if (outputFile.length() < 100) {
+ response.setIntHeader("Refresh", 2);
+ writeMessage(response, "This page auto-refreshes every 2 seconds until output file is ready...");
+ return;
+ }
+ response.setContentType(MimeType.HTML);
+ response.getOutputStream().write(Files.readAllBytes(Paths.get(outputFile.getPath())));
+ response.getOutputStream().flush();
+ response.getOutputStream().close();
+ }
+
+ private void writeMessage(HttpServletResponse response, String message) throws IOException {
+ response.setContentType(MimeType.TEXT);
+ PrintWriter out = response.getWriter();
+ out.println(message);
+ out.close();
+ }
+}
diff --git a/tez-common/src/main/java/org/apache/tez/common/web/ProfileServlet.java b/tez-common/src/main/java/org/apache/tez/common/web/ProfileServlet.java
new file mode 100644
index 0000000000..1cdddfbf9c
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/web/ProfileServlet.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common.web;
+
+import com.google.common.base.Joiner;
+
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.tez.common.TezUtilsInternal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ *
+ * Servlet that runs async-profiler as web-endpoint.
+ * Following options from async-profiler can be specified as query paramater.
+ * // -e event profiling event: cpu|alloc|lock|cache-misses etc.
+ * // -d duration run profiling for{@literal } seconds (integer)
+ * // -i interval sampling interval in nanoseconds (long)
+ * // -j jstackdepth maximum Java stack depth (integer)
+ * // -b bufsize frame buffer size (long)
+ * // -t profile different threads separately
+ * // -s simple class names instead of FQN
+ * // -o fmt[,fmt...] output format: summary|traces|flat|collapsed|svg|tree|jfr
+ * // --width px SVG width pixels (integer)
+ * // --height px SVG frame height pixels (integer)
+ * // --minwidth px skip frames smaller than px (double)
+ * // --reverse generate stack-reversed FlameGraph / Call tree
+ * Example:
+ * - To collect 30 second CPU profile of current process (returns FlameGraph svg)
+ * {@literal curl "http://localhost:10002/prof"}
+ * - To collect 1 minute CPU profile of current process and output in tree format (html)
+ * {@literal curl "http://localhost:10002/prof?output=tree&duration=60"}
+ * - To collect 30 second heap allocation profile of current process (returns FlameGraph svg)
+ * {@literal curl "http://localhost:10002/prof?event=alloc"}
+ * - To collect lock contention profile of current process (returns FlameGraph svg)
+ * {@literal curl "http://localhost:10002/prof?event=lock"}
+ * Following event types are supported (default is 'cpu') (NOTE: not all OS'es support all events)
+ * // Perf events:
+ * // cpu
+ * // page-faults
+ * // context-switches
+ * // cycles
+ * // instructions
+ * // cache-references
+ * // cache-misses
+ * // branches
+ * // branch-misses
+ * // bus-cycles
+ * // L1-dcache-load-misses
+ * // LLC-load-misses
+ * // dTLB-load-misses
+ * // mem:breakpoint
+ * // trace:tracepoint
+ * // Java events:
+ * // alloc
+ * // lock
+ *
+ */
+public class ProfileServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(ProfileServlet.class);
+ private static final String ACCESS_CONTROL_ALLOW_METHODS = "Access-Control-Allow-Methods";
+ private static final String ALLOWED_METHODS = "GET";
+ private static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin";
+ private static final String CONTENT_TYPE_TEXT = "text/plain; charset=utf-8";
+ private static final String ASYNC_PROFILER_HOME_ENV = "ASYNC_PROFILER_HOME";
+ private static final String ASYNC_PROFILER_HOME_SYSTEM_PROPERTY = "async.profiler.home";
+ private static final String PROFILER_SCRIPT = "/profiler.sh";
+ private static final int DEFAULT_DURATION_SECONDS = 10;
+ private static final AtomicInteger ID_GEN = new AtomicInteger(0);
+ public static final String OUTPUT_DIR = System.getProperty("java.io.tmpdir") + "/prof-output";
+
+ enum Event {
+ CPU("cpu"),
+ ALLOC("alloc"),
+ LOCK("lock"),
+ PAGE_FAULTS("page-faults"),
+ CONTEXT_SWITCHES("context-switches"),
+ CYCLES("cycles"),
+ INSTRUCTIONS("instructions"),
+ CACHE_REFERENCES("cache-references"),
+ CACHE_MISSES("cache-misses"),
+ BRANCHES("branches"),
+ BRANCH_MISSES("branch-misses"),
+ BUS_CYCLES("bus-cycles"),
+ L1_DCACHE_LOAD_MISSES("L1-dcache-load-misses"),
+ LLC_LOAD_MISSES("LLC-load-misses"),
+ DTLB_LOAD_MISSES("dTLB-load-misses"),
+ MEM_BREAKPOINT("mem:breakpoint"),
+ TRACE_TRACEPOINT("trace:tracepoint");
+
+ private final String internalName;
+
+ Event(final String internalName) {
+ this.internalName = internalName;
+ }
+
+ public String getInternalName() {
+ return internalName;
+ }
+
+ public static Event fromInternalName(final String name) {
+ for (Event event : values()) {
+ if (event.getInternalName().equalsIgnoreCase(name)) {
+ return event;
+ }
+ }
+ return null;
+ }
+ }
+
+ enum Output {
+ SUMMARY, TRACES, FLAT, COLLAPSED, SVG, TREE, JFR
+ }
+
+ private final Lock profilerLock = new ReentrantLock();
+ private Integer pid;
+ private String asyncProfilerHome;
+ private transient Process process;
+
+ public ProfileServlet() {
+ this.asyncProfilerHome = getAsyncProfilerHome();
+ this.pid = TezUtilsInternal.getPid();
+ LOG.info("Servlet process PID: {} asyncProfilerHome: {}", pid, asyncProfilerHome);
+ }
+
+ public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
+ response.setContentType("text/plain; charset=UTF-8");
+ PrintStream out = new PrintStream(response.getOutputStream(), false, "UTF-8");
+ if (!HttpServer2.isInstrumentationAccessAllowed(this.getServletContext(), request, response)) {
+ response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
+ setResponseHeader(response);
+ out.println("Unauthorized: Instrumentation access is not allowed!");
+ out.close();
+ return;
+ }
+
+ // make sure async profiler home is set
+ if (asyncProfilerHome == null || asyncProfilerHome.trim().isEmpty()) {
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ setResponseHeader(response);
+ out.println("ASYNC_PROFILER_HOME env is not set");
+ out.close();
+ return;
+ }
+
+ // if pid is explicitly specified, use it else default to current process
+ pid = getInteger(request, "pid", pid);
+ // if pid is not specified in query param and if current process pid cannot be determined
+ if (pid == null) {
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ setResponseHeader(response);
+ out.println("'pid' query parameter unspecified or unable to determine PID of current process.");
+ out.close();
+ return;
+ }
+
+ final int duration = getInteger(request, "duration", DEFAULT_DURATION_SECONDS);
+ final Output output = getOutput(request);
+ final Event event = getEvent(request);
+ final Long interval = getLong(request, "interval");
+ final Integer jstackDepth = getInteger(request, "jstackdepth", null);
+ final Long bufsize = getLong(request, "bufsize");
+ final boolean thread = request.getParameterMap().containsKey("thread");
+ final boolean simple = request.getParameterMap().containsKey("simple");
+ final Integer width = getInteger(request, "width", null);
+ final Integer height = getInteger(request, "height", null);
+ final Double minwidth = getMinWidth(request);
+ final boolean reverse = request.getParameterMap().containsKey("reverse");
+ if (process == null || !process.isAlive()) {
+ try {
+ int lockTimeoutSecs = 3;
+ if (profilerLock.tryLock(lockTimeoutSecs, TimeUnit.SECONDS)) {
+ try {
+ File outputFile = new File(OUTPUT_DIR,
+ "async-prof-pid-" + pid + "-" + event.name().toLowerCase() + "-" + ID_GEN.incrementAndGet() + "."
+ + output.name().toLowerCase());
+ List cmd = new ArrayList<>();
+ cmd.add(asyncProfilerHome + PROFILER_SCRIPT);
+ cmd.add("-e");
+ cmd.add(event.getInternalName());
+ cmd.add("-d");
+ cmd.add("" + duration);
+ cmd.add("-o");
+ cmd.add(output.name().toLowerCase());
+ cmd.add("-f");
+ cmd.add(outputFile.getAbsolutePath());
+ if (interval != null) {
+ cmd.add("-i");
+ cmd.add(interval.toString());
+ }
+ if (jstackDepth != null) {
+ cmd.add("-j");
+ cmd.add(jstackDepth.toString());
+ }
+ if (bufsize != null) {
+ cmd.add("-b");
+ cmd.add(bufsize.toString());
+ }
+ if (thread) {
+ cmd.add("-t");
+ }
+ if (simple) {
+ cmd.add("-s");
+ }
+ if (width != null) {
+ cmd.add("--width");
+ cmd.add(width.toString());
+ }
+ if (height != null) {
+ cmd.add("--height");
+ cmd.add(height.toString());
+ }
+ if (minwidth != null) {
+ cmd.add("--minwidth");
+ cmd.add(minwidth.toString());
+ }
+ if (reverse) {
+ cmd.add("--reverse");
+ }
+ cmd.add(pid.toString());
+ process = new ProcessBuilder(cmd).start();
+
+ // set response and set refresh header to output location
+ setResponseHeader(response);
+ response.setStatus(HttpServletResponse.SC_ACCEPTED);
+ String relativeUrl = "/prof-output";
+ // to avoid auto-refresh by ProfileOutputServlet, refreshDelay can be specified via url param
+ int refreshDelay = getInteger(request, "refreshDelay", 0);
+ // instead of sending redirect, set auto-refresh so that browsers will refresh with redirected url
+ response.setHeader("Refresh", (duration + refreshDelay) + "; URL=" + relativeUrl + '?'
+ + ProfileOutputServlet.FILE_QUERY_PARAM + '=' + outputFile.getName());
+
+ out.println("Profiled PID: " + pid);
+ out.println("Started [" + event.getInternalName()
+ + "] profiling. This page will automatically redirect to "
+ + relativeUrl + " after " + duration + " seconds.\n\ncommand:\n" + Joiner.on(" ").join(cmd));
+ out.flush();
+ } finally {
+ profilerLock.unlock();
+ }
+ } else {
+ setResponseHeader(response);
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ out.println("Unable to acquire lock. Another instance of profiler might be running.");
+ LOG.warn("Unable to acquire lock in {} seconds. Another instance of profiler might be running.",
+ lockTimeoutSecs);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while acquiring profile lock.", e);
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ }
+ } else {
+ setResponseHeader(response);
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ out.println("Another instance of profiler is already running.");
+ }
+ out.close();
+ }
+
+ private Integer getInteger(final HttpServletRequest req, final String param, final Integer defaultValue) {
+ final String value = req.getParameter(param);
+ if (value != null) {
+ try {
+ return Integer.valueOf(value);
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+ return defaultValue;
+ }
+
+ private Long getLong(final HttpServletRequest req, final String param) {
+ final String value = req.getParameter(param);
+ if (value != null) {
+ try {
+ return Long.valueOf(value);
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+ return null;
+ }
+
+ private Double getMinWidth(final HttpServletRequest req) {
+ final String value = req.getParameter("minwidth");
+ if (value != null) {
+ try {
+ return Double.valueOf(value);
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+ return null;
+ }
+
+ private Event getEvent(final HttpServletRequest req) {
+ final String eventArg = req.getParameter("event");
+ if (eventArg != null) {
+ Event event = Event.fromInternalName(eventArg);
+ return event == null ? Event.CPU : event;
+ }
+ return Event.CPU;
+ }
+
+ private Output getOutput(final HttpServletRequest req) {
+ final String outputArg = req.getParameter("output");
+ if (outputArg != null) {
+ try {
+ return Output.valueOf(outputArg.trim().toUpperCase());
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Output format value is invalid, returning with default SVG");
+ return Output.SVG;
+ }
+ }
+ return Output.SVG;
+ }
+
+ private void setResponseHeader(final HttpServletResponse response) {
+ response.setHeader(ACCESS_CONTROL_ALLOW_METHODS, ALLOWED_METHODS);
+ response.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
+ response.setContentType(CONTENT_TYPE_TEXT);
+ }
+
+ public static String getAsyncProfilerHome() {
+ String asyncProfilerHome = System.getenv(ASYNC_PROFILER_HOME_ENV);
+ // if ENV is not set, see if -Dasync.profiler.home=/path/to/async/profiler/home is set
+ if (asyncProfilerHome == null || asyncProfilerHome.trim().isEmpty()) {
+ asyncProfilerHome = System.getProperty(ASYNC_PROFILER_HOME_SYSTEM_PROPERTY);
+ }
+ return asyncProfilerHome;
+ }
+}
diff --git a/tez-common/src/main/java/org/apache/tez/common/web/ServletToControllerAdapters.java b/tez-common/src/main/java/org/apache/tez/common/web/ServletToControllerAdapters.java
new file mode 100644
index 0000000000..304e9a9118
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/web/ServletToControllerAdapters.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.web;
+
+import javax.servlet.ServletException;
+
+import org.apache.hadoop.conf.ConfServlet;
+import org.apache.hadoop.http.HttpServer2.StackServlet;
+import org.apache.hadoop.jmx.JMXJsonServlet;
+
+public class ServletToControllerAdapters {
+ public static class JMXJsonServletController extends AbstractServletToControllerAdapter {
+ public JMXJsonServletController() throws ServletException {
+ this.servlet = new JMXJsonServlet();
+ }
+ }
+
+ public static class ConfServletController extends AbstractServletToControllerAdapter {
+ public ConfServletController() throws ServletException {
+ this.servlet = new ConfServlet();
+ }
+ }
+
+ public static class StackServletController extends AbstractServletToControllerAdapter {
+ public StackServletController() throws ServletException {
+ this.servlet = new StackServlet();
+ }
+ }
+
+ public static class ProfileServletController extends AbstractServletToControllerAdapter {
+ public ProfileServletController() throws ServletException {
+ this.servlet = new ProfileServlet();
+ }
+ }
+
+ public static class ProfileOutputServletController extends AbstractServletToControllerAdapter {
+ public ProfileOutputServletController() throws ServletException {
+ this.servlet = new ProfileOutputServlet();
+ }
+ }
+
+}
diff --git a/tez-common/src/main/java/org/apache/tez/common/web/package-info.java b/tez-common/src/main/java/org/apache/tez/common/web/package-info.java
new file mode 100644
index 0000000000..2fbda31fda
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/web/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@Private
+package org.apache.tez.common.web;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
\ No newline at end of file
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/DAGIDAware.java b/tez-common/src/main/java/org/apache/tez/dag/records/DAGIDAware.java
new file mode 100644
index 0000000000..1234a30053
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/DAGIDAware.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public interface DAGIDAware {
+ TezDAGID getDAGID();
+
+ default ApplicationId getApplicationId() {
+ return getDAGID().getApplicationId();
+ }
+}
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptIDAware.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptIDAware.java
new file mode 100644
index 0000000000..924fd07109
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptIDAware.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+public interface TaskAttemptIDAware extends TaskIDAware {
+ TezTaskAttemptID getTaskAttemptID();
+
+ @Override
+ default TezTaskID getTaskID() {
+ return getTaskAttemptID().getTaskID();
+ }
+}
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskIDAware.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskIDAware.java
new file mode 100644
index 0000000000..0bee45dfa1
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskIDAware.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+public interface TaskIDAware extends VertexIDAware {
+ TezTaskID getTaskID();
+
+ @Override
+ default TezVertexID getVertexID() {
+ return getTaskID().getVertexID();
+ }
+}
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
index 24365c95e2..c46aa6088b 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
@@ -21,13 +21,15 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Objects;
-import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.common.Preconditions;
import org.apache.tez.util.FastNumberFormat;
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+
/**
* TezDAGID represents the immutable and unique identifier for
* a Tez DAG.
@@ -40,25 +42,21 @@
*/
public class TezDAGID extends TezID {
- private static TezIDCache tezDAGIDCache = new TezIDCache<>();
+ private static Interner tezDAGIDCache = Interners.newWeakInterner();
private ApplicationId applicationId;
/**
* Get a DAGID object from given {@link ApplicationId}.
* @param applicationId Application that this dag belongs to
* @param id the dag number
+ * @throws NullPointerException if {@code obj} is {@code applicationId}
*/
public static TezDAGID getInstance(ApplicationId applicationId, int id) {
// The newly created TezDAGIds are primarily for their hashCode method, and
// will be short-lived.
// Alternately the cache can be keyed by the hash of the incoming paramters.
- Preconditions.checkArgument(applicationId != null, "ApplicationID cannot be null");
- return tezDAGIDCache.getInstance(new TezDAGID(applicationId, id));
- }
-
- @InterfaceAudience.Private
- public static void clearCache() {
- tezDAGIDCache.clear();
+ Objects.requireNonNull(applicationId, "ApplicationID cannot be null");
+ return tezDAGIDCache.intern(new TezDAGID(applicationId, id));
}
/**
@@ -66,15 +64,16 @@ public static void clearCache() {
* @param yarnRMIdentifier YARN RM identifier
* @param appId application number
* @param id the dag number
+ * @throws NullPointerException if {@code yarnRMIdentifier} is {@code null}
*/
public static TezDAGID getInstance(String yarnRMIdentifier, int appId, int id) {
// The newly created TezDAGIds are primarily for their hashCode method, and
// will be short-lived.
// Alternately the cache can be keyed by the hash of the incoming paramters.
- Preconditions.checkArgument(yarnRMIdentifier != null, "yarnRMIdentifier cannot be null");
- return tezDAGIDCache.getInstance(new TezDAGID(yarnRMIdentifier, appId, id));
+ Objects.requireNonNull(yarnRMIdentifier, "yarnRMIdentifier cannot be null");
+ return tezDAGIDCache.intern(new TezDAGID(yarnRMIdentifier, appId, id));
}
-
+
// Public for Writable serialization. Verify if this is actually required.
public TezDAGID() {
}
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
index cd7b27de45..7efbd9a889 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
@@ -21,8 +21,6 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.lang.ref.WeakReference;
-import java.util.WeakHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -42,25 +40,6 @@ public abstract class TezID implements WritableComparable {
public static final char SEPARATOR = '_';
protected int id;
- public static class TezIDCache {
- private final WeakHashMap> cache = new WeakHashMap<>();
-
- synchronized T getInstance(final T id) {
- final WeakReference cached = cache.get(id);
- if (cached != null) {
- final T value = cached.get();
- if (value != null)
- return value;
- }
- cache.put(id, new WeakReference(id));
- return id;
- }
-
- synchronized void clear() {
- cache.clear();
- }
- }
-
/** constructs an ID object from the given int */
public TezID(int id) {
this.id = id;
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
index 7aee80f4d6..fe2b84449f 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
@@ -21,10 +21,14 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Objects;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+
/**
* TezTaskAttemptID represents the immutable and unique identifier for
* a task attempt. Each task attempt is one particular instance of a Tez Task
@@ -42,11 +46,11 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
-public class TezTaskAttemptID extends TezID {
+public class TezTaskAttemptID extends TezID implements TaskIDAware {
public static final String ATTEMPT = "attempt";
private TezTaskID taskId;
- private static TezIDCache tezTaskAttemptIDCache = new TezIDCache<>();
+ private static Interner tezTaskAttemptIDCache = Interners.newWeakInterner();
// Public for Writable serialization. Verify if this is actually required.
public TezTaskAttemptID() {
@@ -56,25 +60,20 @@ public TezTaskAttemptID() {
* Constructs a TaskAttemptID object from given {@link TezTaskID}.
* @param taskID TaskID that this task belongs to
* @param id the task attempt number
+ * @throws NullPointerException if {@code taskID} is {@code null}
*/
public static TezTaskAttemptID getInstance(TezTaskID taskID, int id) {
- return tezTaskAttemptIDCache.getInstance(new TezTaskAttemptID(taskID, id));
- }
-
- @InterfaceAudience.Private
- public static void clearCache() {
- tezTaskAttemptIDCache.clear();
+ Objects.requireNonNull(taskID);
+ return tezTaskAttemptIDCache.intern(new TezTaskAttemptID(taskID, id));
}
private TezTaskAttemptID(TezTaskID taskId, int id) {
super(id);
- if(taskId == null) {
- throw new IllegalArgumentException("taskId cannot be null");
- }
this.taskId = taskId;
}
/** Returns the {@link TezTaskID} object that this task attempt belongs to */
+ @Override
public TezTaskID getTaskID() {
return taskId;
}
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
index 15b695c7a5..08310f3dfc 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
@@ -21,13 +21,16 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Objects;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.tez.common.Preconditions;
import org.apache.tez.util.FastNumberFormat;
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+
/**
* TaskID represents the immutable and unique identifier for
* a Tez Task. Each TaskID encompasses multiple attempts made to
@@ -38,7 +41,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
-public class TezTaskID extends TezID {
+public class TezTaskID extends TezID implements VertexIDAware {
public static final String TASK = "task";
private final int serializingHash;
@@ -51,27 +54,22 @@ public FastNumberFormat initialValue() {
}
};
- private static TezIDCache tezTaskIDCache = new TezIDCache<>();
+ private static Interner tezTaskIDCache = Interners.newWeakInterner();
private TezVertexID vertexId;
/**
* Constructs a TezTaskID object from given {@link TezVertexID}.
* @param vertexID the vertexID object for this TezTaskID
* @param id the tip number
+ * @throws NullPointerException if {@code vertexID} is {@code null}
*/
public static TezTaskID getInstance(TezVertexID vertexID, int id) {
- Preconditions.checkArgument(vertexID != null, "vertexID cannot be null");
- return tezTaskIDCache.getInstance(new TezTaskID(vertexID, id));
- }
-
- @InterfaceAudience.Private
- public static void clearCache() {
- tezTaskIDCache.clear();
+ Objects.requireNonNull(vertexID, "vertexID cannot be null");
+ return tezTaskIDCache.intern(new TezTaskID(vertexID, id));
}
private TezTaskID(TezVertexID vertexID, int id) {
super(id);
- Preconditions.checkArgument(vertexID != null, "vertexID cannot be null");
this.vertexId = vertexID;
this.serializingHash = getHashCode(true);
}
@@ -81,6 +79,7 @@ public int getSerializingHash() {
}
/** Returns the {@link TezVertexID} object that this task belongs to */
+ @Override
public TezVertexID getVertexID() {
return vertexId;
}
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
index b5a36abe33..f7becc250f 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
@@ -21,13 +21,16 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Objects;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.tez.common.Preconditions;
import org.apache.tez.util.FastNumberFormat;
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+
/**
* TezVertexID represents the immutable and unique identifier for
* a Vertex in a Tez DAG. Each TezVertexID encompasses multiple Tez Tasks.
@@ -41,7 +44,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
-public class TezVertexID extends TezID {
+public class TezVertexID extends TezID implements DAGIDAware {
public static final String VERTEX = "vertex";
static final ThreadLocal tezVertexIdFormat = new ThreadLocal() {
@@ -53,7 +56,7 @@ public FastNumberFormat initialValue() {
}
};
- private static TezIDCache tezVertexIDCache = new TezIDCache<>();
+ private static Interner tezVertexIDCache = Interners.newWeakInterner();
private TezDAGID dagId;
// Public for Writable serialization. Verify if this is actually required.
@@ -64,15 +67,11 @@ public TezVertexID() {
* Constructs a TezVertexID object from given {@link TezDAGID}.
* @param dagId TezDAGID object for this TezVertexID
* @param id the tip number
+ * @throws NullPointerException if {@code dagId} is {@code null}
*/
public static TezVertexID getInstance(TezDAGID dagId, int id) {
- Preconditions.checkArgument(dagId != null, "DagID cannot be null");
- return tezVertexIDCache.getInstance(new TezVertexID(dagId, id));
- }
-
- @InterfaceAudience.Private
- public static void clearCache() {
- tezVertexIDCache.clear();
+ Objects.requireNonNull(dagId, "DagID cannot be null");
+ return tezVertexIDCache.intern(new TezVertexID(dagId, id));
}
private TezVertexID(TezDAGID dagId, int id) {
@@ -81,7 +80,8 @@ private TezVertexID(TezDAGID dagId, int id) {
}
/** Returns the {@link TezDAGID} object that this tip belongs to */
- public TezDAGID getDAGId() {
+ @Override
+ public TezDAGID getDAGID() {
return dagId;
}
@@ -160,5 +160,4 @@ public static TezVertexID fromString(String vertexIdStr) {
}
return null;
}
-
}
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/VertexIDAware.java b/tez-common/src/main/java/org/apache/tez/dag/records/VertexIDAware.java
new file mode 100644
index 0000000000..01bbe859b6
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/VertexIDAware.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+public interface VertexIDAware extends DAGIDAware {
+ TezVertexID getVertexID();
+
+ @Override
+ default TezDAGID getDAGID() {
+ return getVertexID().getDAGID();
+ }
+}
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/VertexIdentifierImpl.java b/tez-common/src/main/java/org/apache/tez/dag/records/VertexIdentifierImpl.java
index 4480f742fc..83b503203c 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/VertexIdentifierImpl.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/VertexIdentifierImpl.java
@@ -30,7 +30,7 @@ public class VertexIdentifierImpl implements VertexIdentifier {
public VertexIdentifierImpl(String dagName, String vertexName, TezVertexID vertexId) {
this.vertexId = vertexId;
this.vertexName = vertexName;
- this.dagIdentifier = new DagIdentifierImpl(dagName, vertexId.getDAGId());
+ this.dagIdentifier = new DagIdentifierImpl(dagName, vertexId.getDAGID());
}
@Override
diff --git a/tez-common/src/main/java/org/apache/tez/util/LoggingUtils.java b/tez-common/src/main/java/org/apache/tez/util/LoggingUtils.java
new file mode 100644
index 0000000000..e09b6b0964
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/util/LoggingUtils.java
@@ -0,0 +1,151 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.util;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.Hashtable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.helpers.ThreadLocalMap;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class LoggingUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(LoggingUtils.class);
+
+ private LoggingUtils() {}
+
+ @SuppressWarnings("unchecked")
+ public static void initLoggingContext(ThreadLocalMap threadLocalMap, Configuration conf,
+ String dagId, String taskAttemptId) {
+ Hashtable data = (Hashtable) threadLocalMap.get();
+ if (data == null) {
+ data = new NonClonableHashtable();
+ threadLocalMap.set(data);
+ }
+ data.put("dagId", dagId == null ? "" : dagId);
+ data.put("taskAttemptId", taskAttemptId == null ? "" : taskAttemptId);
+
+ String[] mdcKeys = conf.getStrings(TezConfiguration.TEZ_MDC_CUSTOM_KEYS,
+ TezConfiguration.TEZ_MDC_CUSTOM_KEYS_DEFAULT);
+
+ if (mdcKeys == null || mdcKeys.length == 0) {
+ return;
+ }
+
+ String[] mdcKeysValuesFrom = conf.getStrings(TezConfiguration.TEZ_MDC_CUSTOM_KEYS_CONF_PROPS,
+ TezConfiguration.TEZ_MDC_CUSTOM_KEYS_CONF_PROPS_DEFAULT);
+ LOG.info("MDC_LOGGING: setting up MDC keys: keys: {} / conf: {}", Arrays.asList(mdcKeys),
+ Arrays.asList(mdcKeysValuesFrom));
+
+ int i = 0;
+ for (String mdcKey : mdcKeys) {
+ // don't want to fail on incorrect mdc key settings, but warn in app logs
+ if (mdcKey.isEmpty() || mdcKeysValuesFrom.length < i + 1) {
+ LOG.warn("cannot set mdc key: {}", mdcKey);
+ break;
+ }
+
+ String mdcValue = mdcKeysValuesFrom[i] == null ? "" : conf.get(mdcKeysValuesFrom[i]);
+ // MDC is backed by a Hashtable, let's prevent NPE because of null values
+ if (mdcValue != null) {
+ data.put(mdcKey, mdcValue);
+ } else {
+ LOG.warn("MDC_LOGGING: mdc value is null for key: {}, config key: {}", mdcKey,
+ mdcKeysValuesFrom[i]);
+ }
+
+ i++;
+ }
+ }
+
+ public static String getPatternForAM(Configuration conf) {
+ String pattern =
+ conf.get(TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_AM, TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_DEFAULT);
+ return pattern.isEmpty() ? null : pattern;
+ }
+
+ public static String getPatternForTask(Configuration conf) {
+ String pattern =
+ conf.get(TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_TASK, TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_DEFAULT);
+ return pattern.isEmpty() ? null : pattern;
+ }
+
+ /**
+ * This method is for setting a NonClonableHashtable into log4j's mdc. Reflection hacks are
+ * needed, because MDC.mdc is well protected (final static MDC mdc = new MDC();). The logic below
+ * is supposed to be called once per JVM, so it's not a subject to performance bottlenecks. For
+ * further details of this solution, please check NonClonableHashtable class, which is set into
+ * the ThreadLocalMap. A wrong outcome of this method (any kind of runtime/reflection problems)
+ * should not affect the DAGAppMaster/TezChild. In case of an exception a ThreadLocalMap is
+ * returned, but it won't affect the content of the MDC.
+ */
+ @SuppressWarnings("unchecked")
+ public static ThreadLocalMap setupLog4j() {
+ ThreadLocalMap mdcContext = new ThreadLocalMap();
+ mdcContext.set(new NonClonableHashtable());
+
+ try {
+ final Constructor>[] constructors = org.apache.log4j.MDC.class.getDeclaredConstructors();
+ for (Constructor> c : constructors) {
+ c.setAccessible(true);
+ }
+
+ org.apache.log4j.MDC mdc = (org.apache.log4j.MDC) constructors[0].newInstance();
+ Field tlmField = org.apache.log4j.MDC.class.getDeclaredField("tlm");
+ tlmField.setAccessible(true);
+ tlmField.set(mdc, mdcContext);
+
+ Field mdcField = org.apache.log4j.MDC.class.getDeclaredField("mdc");
+ mdcField.setAccessible(true);
+
+ Field modifiers = Field.class.getDeclaredField("modifiers");
+ modifiers.setAccessible(true);
+ modifiers.setInt(mdcField, mdcField.getModifiers() & ~Modifier.FINAL);
+
+ mdcField.set(null, mdc);
+
+ } catch (Exception e) {
+ LOG.warn("Cannot set log4j global MDC, mdcContext won't be applied to log4j's MDC class", e);
+ }
+
+ return mdcContext;
+ }
+
+ /**
+ * NonClonableHashtable is a special class for hacking the log4j MDC context. By design, log4j's
+ * MDC uses a ThreadLocalMap, which clones parent thread's context before propagating it to child
+ * thread (see: @see {@link org.apache.log4j.helpers.ThreadLocalMap#childValue()}). In our
+ * usecase, this is not suitable, as we want to maintain only one context globally (and set e.g.
+ * dagId, taskAttemptId), then update it as easy as possible when dag/taskattempt changes, without
+ * having to propagate the update parameters to all the threads in the JVM.
+ */
+ private static class NonClonableHashtable extends Hashtable {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public synchronized Object clone() {
+ return this;
+ }
+ }
+}
diff --git a/tez-common/src/main/java/org/apache/tez/util/StringInterner.java b/tez-common/src/main/java/org/apache/tez/util/StringInterner.java
new file mode 100644
index 0000000000..b8c911307c
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/util/StringInterner.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.util;
+
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+
+/**
+ * A class to replace the {@code String.intern()}. The {@code String.intern()}
+ * has some well-known performance limitations, and should generally be avoided.
+ * Prefer Google's interner over the JDK's implementation.
+ */
+public final class StringInterner {
+
+ private static final Interner STRING_INTERNER =
+ Interners.newWeakInterner();
+
+ private StringInterner() {
+ }
+
+ public static String intern(final String str) {
+ return (str == null) ? null : STRING_INTERNER.intern(str);
+ }
+}
diff --git a/tez-common/src/main/java/org/apache/tez/util/TezRuntimeShutdownHandler.java b/tez-common/src/main/java/org/apache/tez/util/TezRuntimeShutdownHandler.java
new file mode 100644
index 0000000000..4881e08ab9
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/util/TezRuntimeShutdownHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class TezRuntimeShutdownHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(TezRuntimeShutdownHandler.class);
+
+ private static final List shutdownTasks = new ArrayList<>();
+
+ private TezRuntimeShutdownHandler() {
+ }
+
+ public static void addShutdownTask(Runnable r) {
+ shutdownTasks.add(r);
+ }
+
+ public static synchronized void shutdown() {
+ LOG.info("Handling {} shutdown tasks", shutdownTasks.size());
+ for (Runnable shutdownTask : shutdownTasks) {
+ shutdownTask.run();
+ }
+ }
+}
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestEnvironmentUpdateUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestEnvironmentUpdateUtils.java
deleted file mode 100644
index a9cecc216a..0000000000
--- a/tez-common/src/test/java/org/apache/tez/common/TestEnvironmentUpdateUtils.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-public class TestEnvironmentUpdateUtils {
-
- @Test(timeout = 5000)
- public void testMultipleUpdateEnvironment() {
- EnvironmentUpdateUtils.put("test.environment1", "test.value1");
- EnvironmentUpdateUtils.put("test.environment2", "test.value2");
- assertEquals("Environment was not set propertly", "test.value1", System.getenv("test.environment1"));
- assertEquals("Environment was not set propertly", "test.value2", System.getenv("test.environment2"));
- }
-
- @Test(timeout = 5000)
- public void testConcurrentRequests() throws InterruptedException {
- int timeoutSecond = 5;
- int concurThread = 10;
- int exceptionCount = 0;
- List> tasks = new ArrayList>();
- List> pendingTasks = new ArrayList>();
- final ExecutorService callbackExecutor = Executors.newFixedThreadPool(concurThread,
- new ThreadFactoryBuilder().setDaemon(false).setNameFormat("CallbackExecutor").build());
- ListeningExecutorService taskExecutorService =
- MoreExecutors.listeningDecorator(callbackExecutor);
- while(concurThread > 0){
- ListenableFuture