From d92fac8994673736c3ef5d5c6dc0fcf7f1f3cf97 Mon Sep 17 00:00:00 2001 From: okumin Date: Mon, 18 Dec 2023 21:36:47 +0900 Subject: [PATCH 1/8] TEZ-4527: Add generic and pluggable hooks for DAGs and task attempts --- .../apache/tez/dag/api/TezConfiguration.java | 21 +++++++++++++-- .../apache/tez/runtime/hook/TezDAGHook.java | 26 +++++++++++++++++++ .../tez/runtime/hook/TezTaskAttemptHook.java | 26 +++++++++++++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 21 +++++++++++---- .../apache/tez/dag/app/ThreadDumpDAGHook.java | 23 ++++++++++++++++ .../org/apache/tez/runtime/task/TezChild.java | 16 +++++++++--- .../task/ThreadDumpTaskAttemptHook.java | 23 ++++++++++++++++ 7 files changed, 145 insertions(+), 11 deletions(-) create mode 100644 tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java create mode 100644 tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java create mode 100644 tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java create mode 100644 tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 7e86853631..8862f4b7d6 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -2297,12 +2297,14 @@ static Set getPropertySet() { public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES = "tez.mrreader.config.update.properties"; /** - * Frequency at which thread dump should be captured. Supports TimeUnits. + * Frequency at which thread dump should be captured. Supports TimeUnits. This is effective only + * when org.apache.tez.dag.app.ThreadDumpDAGHook is configured to tez.am.hooks or + * org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook is configured to tez.task.attempt.hooks. */ @ConfigurationScope(Scope.DAG) @ConfigurationProperty public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval"; - public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms"; + public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "100ms"; /** * Limits the amount of data that can be written to LocalFileSystem by a Task. @@ -2312,4 +2314,19 @@ static Set getPropertySet() { public static final String TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES = "tez.task.local-fs.write-limit.bytes"; public static final long TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT = -1; + /** + * Comma-separated list of hook classes implementing org.apache.tez.runtime.hook.TezDAGHook. + * e.g. org.apache.tez.dag.app.ThreadDumpDAGHook + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty + public static final String TEZ_AM_HOOKS = TEZ_AM_PREFIX + "hooks"; + + /** + * Comma-separated list of hook classes implementing org.apache.tez.runtime.hook.TezTaskAttemptHook. + * e.g. org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook + */ + @ConfigurationScope(Scope.DAG) + @ConfigurationProperty + public static final String TEZ_TASK_ATTEMPT_HOOKS = TEZ_TASK_PREFIX + "attempt.hooks"; } diff --git a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java new file mode 100644 index 0000000000..c9ce95810f --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java @@ -0,0 +1,26 @@ +package org.apache.tez.runtime.hook; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.records.TezDAGID; + +/** + * A hook which is instantiated and triggered before and after a DAG is exeucted. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface TezDAGHook { + /** + * Invoked before the DAG starts. + * + * @param id the DAG id + * @param conf the conf + */ + void start(TezDAGID id, Configuration conf); + + /** + * Invoked after the DAG finishes. + */ + void stop(); +} diff --git a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java new file mode 100644 index 0000000000..a83cb326bd --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java @@ -0,0 +1,26 @@ +package org.apache.tez.runtime.hook; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.records.TezTaskAttemptID; + +/** + * A hook which is instantiated and triggered before and after a task attempt is executed. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface TezTaskAttemptHook { + /** + * Invoked before the task attempt starts. + * + * @param id the task attempt id + * @param conf the conf + */ + void start(TezTaskAttemptID id, Configuration conf); + + /** + * Invoked after the task attempt finishes. + */ + void stop(); +} diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 9c7cc18b60..2656c04490 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -71,6 +71,7 @@ import org.apache.tez.Utils; import org.apache.tez.client.CallerContext; import org.apache.tez.client.TezClientUtils; +import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.SessionNotRunning; @@ -187,7 +188,7 @@ import org.apache.tez.dag.utils.Simple2LevelVersionComparator; import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.hadoop.shim.HadoopShimsLoader; -import org.apache.tez.runtime.TezThreadDumpHelper; +import org.apache.tez.runtime.hook.TezDAGHook; import org.apache.tez.util.LoggingUtils; import org.apache.tez.util.TezMxBeanResourceCalculator; import org.codehaus.jettison.json.JSONException; @@ -343,7 +344,7 @@ public class DAGAppMaster extends AbstractService { Map services = new LinkedHashMap(); private ThreadLocalMap mdcContext; - private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER; + private TezDAGHook[] hooks = {}; public DAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, @@ -770,7 +771,9 @@ protected synchronized void handle(DAGAppMasterEvent event) { "DAGAppMaster Internal Error occurred"); break; case DAG_FINISHED: - tezThreadDumpHelper.stop(); + for (TezDAGHook hook : hooks) { + hook.stop(); + } DAGAppMasterEventDAGFinished finishEvt = (DAGAppMasterEventDAGFinished) event; String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); @@ -2227,7 +2230,9 @@ public Void run() throws Exception { } // Check if the thread dump service is up in any case, if yes attempt a shutdown - tezThreadDumpHelper.stop(); + for (TezDAGHook hook : hooks) { + hook.stop(); + } super.serviceStop(); } @@ -2599,7 +2604,13 @@ private void countHeldContainers(DAG newDAG) { private void startDAGExecution(DAG dag, final Map additionalAmResources) throws TezException { currentDAG = dag; - tezThreadDumpHelper = TezThreadDumpHelper.getInstance(dag.getConf()).start(dag.getID().toString()); + final Configuration conf = dag.getConf(); + final String[] hookClasses = conf.getStrings(TezConfiguration.TEZ_AM_HOOKS, new String[0]); + final TezDAGHook[] hooks = new TezDAGHook[hookClasses.length]; + for (int i = 0; i < hooks.length; i++) { + hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]); + hooks[i].start(dag.getID(), conf); + } // Try localizing the actual resources. List additionalUrlsForClasspath; diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java new file mode 100644 index 0000000000..0f0e697f4d --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java @@ -0,0 +1,23 @@ +package org.apache.tez.dag.app; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.runtime.TezThreadDumpHelper; +import org.apache.tez.runtime.hook.TezDAGHook; + +/** + * A DAG hook which dumps thread information periodically. + */ +public class ThreadDumpDAGHook implements TezDAGHook { + private TezThreadDumpHelper helper; + + @Override + public void start(TezDAGID id, Configuration conf) { + helper = TezThreadDumpHelper.getInstance(conf).start(id.toString()); + } + + @Override + public void stop() { + helper.stop(); + } +} diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 98b07100a8..ed14bd880c 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -52,6 +52,7 @@ import org.apache.log4j.helpers.ThreadLocalMap; import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; +import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezExecutors; import org.apache.tez.common.TezLocalResource; @@ -69,10 +70,10 @@ import org.apache.tez.dag.utils.RelocalizationUtils; import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.hadoop.shim.HadoopShimsLoader; -import org.apache.tez.runtime.TezThreadDumpHelper; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; +import org.apache.tez.runtime.hook.TezTaskAttemptHook; import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.apache.tez.util.LoggingUtils; @@ -120,7 +121,6 @@ public class TezChild { private final AtomicBoolean isShutdown = new AtomicBoolean(false); private final String user; private final boolean updateSysCounters; - private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER; private Multimap startedInputsMap = HashMultimap.create(); private final boolean ownUmbilical; @@ -295,7 +295,13 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, hadoopShim, sharedExecutor); boolean shouldDie; - tezThreadDumpHelper = TezThreadDumpHelper.getInstance(taskConf).start(attemptId.toString()); + final String[] hookClasses = taskConf + .getStrings(TezConfiguration.TEZ_TASK_ATTEMPT_HOOKS, new String[0]); + final TezTaskAttemptHook[] hooks = new TezTaskAttemptHook[hookClasses.length]; + for (int i = 0; i < hooks.length; i++) { + hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]); + hooks[i].start(attemptId, taskConf); + } try { TaskRunner2Result result = taskRunner.run(); LOG.info("TaskRunner2Result: {}", result); @@ -314,7 +320,9 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, e, "TaskExecutionFailure: " + e.getMessage()); } } finally { - tezThreadDumpHelper.stop(); + for (TezTaskAttemptHook hook : hooks) { + hook.stop(); + } FileSystem.closeAllForUGI(childUGI); } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java new file mode 100644 index 0000000000..ecb87a533d --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java @@ -0,0 +1,23 @@ +package org.apache.tez.runtime.task; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.TezThreadDumpHelper; +import org.apache.tez.runtime.hook.TezTaskAttemptHook; + +/** + * A task attempt hook which dumps thread information periodically. + */ +public class ThreadDumpTaskAttemptHook implements TezTaskAttemptHook { + private TezThreadDumpHelper helper; + + @Override + public void start(TezTaskAttemptID id, Configuration conf) { + helper = TezThreadDumpHelper.getInstance(conf).start(id.toString()); + } + + @Override + public void stop() { + helper.stop(); + } +} From c1a162a0cb6942d79bbe61b0d05b31666dd5e2dd Mon Sep 17 00:00:00 2001 From: okumin Date: Mon, 18 Dec 2023 23:33:51 +0900 Subject: [PATCH 2/8] Add licenses --- .../apache/tez/runtime/hook/TezDAGHook.java | 18 ++++++++++++++++++ .../tez/runtime/hook/TezTaskAttemptHook.java | 18 ++++++++++++++++++ .../apache/tez/dag/app/ThreadDumpDAGHook.java | 18 ++++++++++++++++++ .../task/ThreadDumpTaskAttemptHook.java | 18 ++++++++++++++++++ 4 files changed, 72 insertions(+) diff --git a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java index c9ce95810f..cbc92817a9 100644 --- a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java +++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.tez.runtime.hook; import org.apache.hadoop.classification.InterfaceAudience; diff --git a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java index a83cb326bd..54931b64d5 100644 --- a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java +++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.tez.runtime.hook; import org.apache.hadoop.classification.InterfaceAudience; diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java index 0f0e697f4d..ff657e47f1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.tez.dag.app; import org.apache.hadoop.conf.Configuration; diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java index ecb87a533d..dd41cee9d2 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.tez.runtime.task; import org.apache.hadoop.conf.Configuration; From ba7e33a3659d292d5bfc940227d6096e2fb803e3 Mon Sep 17 00:00:00 2001 From: okumin Date: Tue, 19 Dec 2023 00:36:55 +0900 Subject: [PATCH 3/8] Follow checkstyle --- .../apache/tez/runtime/hook/package-info.java | 22 +++++++++++++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 2 +- 2 files changed, 23 insertions(+), 1 deletion(-) create mode 100644 tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java diff --git a/tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java b/tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java new file mode 100644 index 0000000000..d977897d86 --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@Private +package org.apache.tez.runtime.hook; + +import org.apache.hadoop.classification.InterfaceAudience.Private; \ No newline at end of file diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 2656c04490..2b1c27db44 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -2606,7 +2606,7 @@ private void startDAGExecution(DAG dag, final Map additio currentDAG = dag; final Configuration conf = dag.getConf(); final String[] hookClasses = conf.getStrings(TezConfiguration.TEZ_AM_HOOKS, new String[0]); - final TezDAGHook[] hooks = new TezDAGHook[hookClasses.length]; + hooks = new TezDAGHook[hookClasses.length]; for (int i = 0; i < hooks.length; i++) { hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]); hooks[i].start(dag.getID(), conf); From 1673bd5a88f94b097ca3c8cb627553e30efb6b6c Mon Sep 17 00:00:00 2001 From: okumin Date: Sun, 22 Dec 2024 00:31:39 +0900 Subject: [PATCH 4/8] Rename tez.thread.dump.interval into tez.hook.thread.dump.interval --- .../java/org/apache/tez/dag/api/TezConfiguration.java | 4 ++-- .../org/apache/tez/runtime/TezThreadDumpHelper.java | 8 ++++---- .../src/test/java/org/apache/tez/test/TestTezJobs.java | 10 ++++++++-- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 8862f4b7d6..2d9cbaf55a 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -2303,8 +2303,8 @@ static Set getPropertySet() { */ @ConfigurationScope(Scope.DAG) @ConfigurationProperty - public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval"; - public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "100ms"; + public static final String TEZ_HOOK_THREAD_DUMP_INTERVAL = "tez.hook.thread.dump.interval"; + public static final String TEZ_HOOK_THREAD_DUMP_INTERVAL_DEFAULT = "100ms"; /** * Limits the amount of data that can be written to LocalFileSystem by a Task. diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java index 6f3e9fec1e..261532f137 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -40,8 +40,8 @@ import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR; import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_REMOTE_APP_LOG_DIR; -import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL; -import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_DEFAULT; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_HOOK_THREAD_DUMP_INTERVAL; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_HOOK_THREAD_DUMP_INTERVAL_DEFAULT; public class TezThreadDumpHelper { @@ -74,8 +74,8 @@ public TezThreadDumpHelper() { } public static TezThreadDumpHelper getInstance(Configuration conf) { - long periodicThreadDumpFrequency = - conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + long periodicThreadDumpFrequency = conf.getTimeDuration(TEZ_HOOK_THREAD_DUMP_INTERVAL, + TEZ_HOOK_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); if (periodicThreadDumpFrequency > 0) { try { diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index 892629f29e..c94a843b0b 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -18,7 +18,9 @@ package org.apache.tez.test; -import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_AM_HOOKS; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_HOOK_THREAD_DUMP_INTERVAL; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_ATTEMPT_HOOKS; import static org.apache.tez.dag.api.TezConstants.TEZ_CONTAINER_LOGGER_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -59,11 +61,13 @@ import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; +import org.apache.tez.dag.app.ThreadDumpDAGHook; import org.apache.tez.mapreduce.examples.CartesianProduct; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; import org.apache.tez.runtime.library.partitioner.HashPartitioner; +import org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -555,7 +559,9 @@ public void testSortMergeJoinExampleDisableSplitGrouping(boolean withThreadDump) org.apache.log4j.Logger.getRootLogger().addAppender(appender); appender.setName(TEZ_CONTAINER_LOGGER_NAME); appender.setContainerLogDir(logPath.toString()); - newConf.set(TEZ_THREAD_DUMP_INTERVAL, "1ms"); + newConf.set(TEZ_AM_HOOKS, ThreadDumpDAGHook.class.getName()); + newConf.set(TEZ_TASK_ATTEMPT_HOOKS, ThreadDumpTaskAttemptHook.class.getName()); + newConf.set(TEZ_HOOK_THREAD_DUMP_INTERVAL, "1ms"); } sortMergeJoinExample.setConf(newConf); Path stagingDirPath = new Path(TEST_ROOT_DIR + "/tmp/tez-staging-dir"); From 79fe8d036a3fe7cae0a66163a7a8d983ffa3a14e Mon Sep 17 00:00:00 2001 From: okumin Date: Sun, 22 Dec 2024 00:48:59 +0900 Subject: [PATCH 5/8] Remove NoopTezThreadDumpHelper --- .../tez/runtime/TezThreadDumpHelper.java | 33 +++++-------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java index 261532f137..e8334dd906 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -24,8 +24,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Appender; +import org.apache.tez.common.Preconditions; import org.apache.tez.common.TezContainerLogAppender; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezUncheckedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +47,6 @@ public class TezThreadDumpHelper { - public static final NoopTezThreadDumpHelper NOOP_TEZ_THREAD_DUMP_HELPER = new NoopTezThreadDumpHelper(); private long duration = 0L; private Path basePath = null; private FileSystem fs = null; @@ -70,21 +71,17 @@ private TezThreadDumpHelper(long duration, Configuration conf) throws IOExceptio "path: {}", duration, basePath); } - public TezThreadDumpHelper() { - } - public static TezThreadDumpHelper getInstance(Configuration conf) { long periodicThreadDumpFrequency = conf.getTimeDuration(TEZ_HOOK_THREAD_DUMP_INTERVAL, TEZ_HOOK_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + Preconditions.checkArgument(periodicThreadDumpFrequency > 0, "%s must be positive duration", + TEZ_HOOK_THREAD_DUMP_INTERVAL); - if (periodicThreadDumpFrequency > 0) { - try { - return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf); - } catch (IOException e) { - LOG.warn("Can not initialize periodic thread dump service", e); - } + try { + return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf); + } catch (IOException e) { + throw new TezUncheckedException("Can not initialize periodic thread dump service", e); } - return NOOP_TEZ_THREAD_DUMP_HELPER; } public TezThreadDumpHelper start(String name) { @@ -178,18 +175,4 @@ private String getTaskName(long id, String taskName) { return id + " (" + taskName + ")"; } } - - private static class NoopTezThreadDumpHelper extends TezThreadDumpHelper { - - @Override - public TezThreadDumpHelper start(String name) { - // Do Nothing - return this; - } - - @Override - public void stop() { - // Do Nothing - } - } } From 943012ab3aeefcfe2e154a669e45c0999c366cb1 Mon Sep 17 00:00:00 2001 From: okumin Date: Sun, 22 Dec 2024 00:56:03 +0900 Subject: [PATCH 6/8] Rephrase some comments --- .../src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java | 2 +- tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java index cbc92817a9..7fb015bdb1 100644 --- a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java +++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java @@ -24,7 +24,7 @@ import org.apache.tez.dag.records.TezDAGID; /** - * A hook which is instantiated and triggered before and after a DAG is exeucted. + * A hook which is instantiated and triggered before and after a DAG is executed. */ @InterfaceAudience.Public @InterfaceStability.Evolving diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 2b1c27db44..4172a5a368 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -2229,7 +2229,7 @@ public Void run() throws Exception { execService.shutdownNow(); } - // Check if the thread dump service is up in any case, if yes attempt a shutdown + // Try to shut down any hooks that are still active for (TezDAGHook hook : hooks) { hook.stop(); } From 293ce63e334e732c742cdc29ed6fe0fd9635d097 Mon Sep 17 00:00:00 2001 From: okumin Date: Sun, 22 Dec 2024 18:19:33 +0900 Subject: [PATCH 7/8] Rename tez.hook.thread.dump.interval back --- .../java/org/apache/tez/dag/api/TezConfiguration.java | 4 ++-- .../org/apache/tez/runtime/TezThreadDumpHelper.java | 10 +++++----- .../src/test/java/org/apache/tez/test/TestTezJobs.java | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 2d9cbaf55a..8862f4b7d6 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -2303,8 +2303,8 @@ static Set getPropertySet() { */ @ConfigurationScope(Scope.DAG) @ConfigurationProperty - public static final String TEZ_HOOK_THREAD_DUMP_INTERVAL = "tez.hook.thread.dump.interval"; - public static final String TEZ_HOOK_THREAD_DUMP_INTERVAL_DEFAULT = "100ms"; + public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval"; + public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "100ms"; /** * Limits the amount of data that can be written to LocalFileSystem by a Task. diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java index e8334dd906..de28d58e19 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -42,8 +42,8 @@ import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR; import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_REMOTE_APP_LOG_DIR; -import static org.apache.tez.dag.api.TezConfiguration.TEZ_HOOK_THREAD_DUMP_INTERVAL; -import static org.apache.tez.dag.api.TezConfiguration.TEZ_HOOK_THREAD_DUMP_INTERVAL_DEFAULT; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_DEFAULT; public class TezThreadDumpHelper { @@ -72,10 +72,10 @@ private TezThreadDumpHelper(long duration, Configuration conf) throws IOExceptio } public static TezThreadDumpHelper getInstance(Configuration conf) { - long periodicThreadDumpFrequency = conf.getTimeDuration(TEZ_HOOK_THREAD_DUMP_INTERVAL, - TEZ_HOOK_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + long periodicThreadDumpFrequency = conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, + TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); Preconditions.checkArgument(periodicThreadDumpFrequency > 0, "%s must be positive duration", - TEZ_HOOK_THREAD_DUMP_INTERVAL); + TEZ_THREAD_DUMP_INTERVAL); try { return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf); diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index c94a843b0b..ee717f33c0 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -19,7 +19,7 @@ package org.apache.tez.test; import static org.apache.tez.dag.api.TezConfiguration.TEZ_AM_HOOKS; -import static org.apache.tez.dag.api.TezConfiguration.TEZ_HOOK_THREAD_DUMP_INTERVAL; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL; import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_ATTEMPT_HOOKS; import static org.apache.tez.dag.api.TezConstants.TEZ_CONTAINER_LOGGER_NAME; import static org.junit.Assert.assertEquals; @@ -561,7 +561,7 @@ public void testSortMergeJoinExampleDisableSplitGrouping(boolean withThreadDump) appender.setContainerLogDir(logPath.toString()); newConf.set(TEZ_AM_HOOKS, ThreadDumpDAGHook.class.getName()); newConf.set(TEZ_TASK_ATTEMPT_HOOKS, ThreadDumpTaskAttemptHook.class.getName()); - newConf.set(TEZ_HOOK_THREAD_DUMP_INTERVAL, "1ms"); + newConf.set(TEZ_THREAD_DUMP_INTERVAL, "1ms"); } sortMergeJoinExample.setConf(newConf); Path stagingDirPath = new Path(TEST_ROOT_DIR + "/tmp/tez-staging-dir"); From 61d8249fc9051acc1b98dfe6059ea00c4634e6ce Mon Sep 17 00:00:00 2001 From: okumin Date: Sun, 22 Dec 2024 18:27:48 +0900 Subject: [PATCH 8/8] Mark some variables final --- .../java/org/apache/tez/runtime/TezThreadDumpHelper.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java index de28d58e19..022186a4b8 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -47,9 +47,9 @@ public class TezThreadDumpHelper { - private long duration = 0L; - private Path basePath = null; - private FileSystem fs = null; + private final long duration; + private final Path basePath; + private final FileSystem fs; private static final ThreadMXBean THREAD_BEAN = ManagementFactory.getThreadMXBean(); private static final Logger LOG = LoggerFactory.getLogger(TezThreadDumpHelper.class);