From d0e1a8f305a2bcb2060bd8e28dd9e8f295578696 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 22 Apr 2016 00:21:08 -0700 Subject: [PATCH 1/4] [SPARK-10001] Consolidate Signaling and SignalLogger. --- .../org/apache/spark/util/SignalLogger.scala | 36 --------- .../{Signaling.scala => SignalUtils.scala} | 78 +++++++++++-------- .../scala/org/apache/spark/util/Utils.scala | 2 +- .../org/apache/spark/repl/Signaling.scala | 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 2 +- 5 files changed, 48 insertions(+), 72 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/util/SignalLogger.scala rename core/src/main/scala/org/apache/spark/util/{Signaling.scala => SignalUtils.scala} (69%) diff --git a/core/src/main/scala/org/apache/spark/util/SignalLogger.scala b/core/src/main/scala/org/apache/spark/util/SignalLogger.scala deleted file mode 100644 index a793c9135eea9..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/SignalLogger.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import org.slf4j.Logger - -/** - * Used to log signals received. This can be very useful in debugging crashes or kills. - */ -private[spark] object SignalLogger { - - private var registered = false - - /** Register a signal handler to log signals on UNIX-like systems. */ - def register(log: Logger): Unit = Seq("TERM", "HUP", "INT").foreach{ sig => - Signaling.register(sig) { - log.error("RECEIVED SIGNAL " + sig) - false - } - } -} diff --git a/core/src/main/scala/org/apache/spark/util/Signaling.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala similarity index 69% rename from core/src/main/scala/org/apache/spark/util/Signaling.scala rename to core/src/main/scala/org/apache/spark/util/SignalUtils.scala index 2075cc45a9b8d..b35f5be28aff6 100644 --- a/core/src/main/scala/org/apache/spark/util/Signaling.scala +++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala @@ -17,12 +17,12 @@ package org.apache.spark.util -import java.util.{Collections, LinkedList} +import java.util.Collections import scala.collection.JavaConverters._ -import scala.collection.mutable.HashMap import org.apache.commons.lang3.SystemUtils +import org.slf4j.Logger import sun.misc.{Signal, SignalHandler} import org.apache.spark.internal.Logging @@ -31,14 +31,52 @@ import org.apache.spark.internal.Logging /** * Contains utilities for working with posix signals. */ -private[spark] object Signaling extends Logging { +private[spark] object SignalUtils extends Logging { + + private var registered = false + + /** Register a signal handler to log signals on UNIX-like systems. */ + def registerLogger(log: Logger): Unit = synchronized { + if (!registered) { + Seq("TERM", "HUP", "INT").foreach { sig => + SignalUtils.register(sig) { + log.error("RECEIVED SIGNAL " + sig) + false + } + } + registered = true + } + } + + /** + * Adds an action to be run when a given signal is received by this process. + * + * Note that signals are only supported on unix-like operating systems and work on a best-effort + * basis: if a signal is not available or cannot be intercepted, only a warning is emitted. + * + * All actions for a given signal are run in a separate thread. + */ + def register(signal: String)(action: => Boolean): Unit = synchronized { + if (SystemUtils.IS_OS_UNIX) { + try { + val handler = handlers.getOrElseUpdate(signal, { + val h = new ActionHandler(new Signal(signal)) + logInfo("Registered signal handler for " + signal) + h + }) + handler.register(action) + } catch { + case ex: Exception => logWarning(s"Failed to register signal handler for " + signal, ex) + } + } + } /** * A handler for the given signal that runs a collection of actions. */ private class ActionHandler(signal: Signal) extends SignalHandler { - private val actions = Collections.synchronizedList(new LinkedList[() => Boolean]) + private val actions = Collections.synchronizedList(new java.util.LinkedList[() => Boolean]) // original signal handler, before this handler was attached private val prevHandler: SignalHandler = Signal.handle(signal, this) @@ -51,11 +89,8 @@ private[spark] object Signaling extends Logging { // register old handler, will receive incoming signals while this handler is running Signal.handle(signal, prevHandler) - val escalate = actions.asScala forall { action => - !action() - } - - if(escalate) { + val escalate = actions.asScala.forall { action => !action() } + if (escalate) { prevHandler.handle(sig) } @@ -69,31 +104,8 @@ private[spark] object Signaling extends Logging { * should be stopped with this handler, false if it should be escalated. */ def register(action: => Boolean): Unit = actions.add(() => action) - } // contains association of signals to their respective handlers - private val handlers = new HashMap[String, ActionHandler] - - /** - * Adds an action to be run when a given signal is received by this process. - * - * Note that signals are only supported on unix-like operating systems and work on a best-effort - * basis: if a signal is not available or cannot be intercepted, only a warning is emitted. - * - * All actions for a given signal are run in a separate thread. - */ - def register(signal: String)(action: => Boolean): Unit = synchronized { - if (SystemUtils.IS_OS_UNIX) try { - val handler = handlers.getOrElseUpdate(signal, { - val h = new ActionHandler(new Signal(signal)) - logInfo("Registered signal handler for " + signal) - h - }) - handler.register(action) - } catch { - case ex: Exception => logWarning(s"Failed to register signal handler for " + signal, ex) - } - } - + private val handlers = new scala.collection.mutable.HashMap[String, ActionHandler] } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 848f7d7adbc7e..ea49991493fd7 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2284,7 +2284,7 @@ private[spark] object Utils extends Logging { */ def initDaemon(log: Logger): Unit = { log.info(s"Started daemon with process name: ${Utils.getProcessName()}") - SignalLogger.register(log) + SignalUtils.registerLogger(log) } } diff --git a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala index c305ed545c4c9..eb284d3acd19e 100644 --- a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala +++ b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala @@ -19,7 +19,7 @@ package org.apache.spark.repl import org.apache.spark.SparkContext import org.apache.spark.internal.Logging -import org.apache.spark.util.{Signaling => USignaling} +import org.apache.spark.util.{SignalUtils => USignaling} private[repl] object Signaling extends Logging { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 5bb63500c8f88..4df90d7b6b0b8 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -716,7 +716,7 @@ object ApplicationMaster extends Logging { private var master: ApplicationMaster = _ def main(args: Array[String]): Unit = { - SignalLogger.register(log) + SignalUtils.registerLogger(log) val amArgs = new ApplicationMasterArguments(args) SparkHadoopUtil.get.runAsSparkUser { () => master = new ApplicationMaster(amArgs, new YarnRMClient) From 2a981a319f51ada0840135440f6884ebdc80f664 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 22 Apr 2016 00:23:22 -0700 Subject: [PATCH 2/4] import --- repl/src/main/scala/org/apache/spark/repl/Signaling.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala index eb284d3acd19e..202febf144626 100644 --- a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala +++ b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala @@ -19,7 +19,7 @@ package org.apache.spark.repl import org.apache.spark.SparkContext import org.apache.spark.internal.Logging -import org.apache.spark.util.{SignalUtils => USignaling} +import org.apache.spark.util.SignalUtils private[repl] object Signaling extends Logging { @@ -28,7 +28,7 @@ private[repl] object Signaling extends Logging { * when no jobs are currently running. * This makes it possible to interrupt a running shell job by pressing Ctrl+C. */ - def cancelOnInterrupt(ctx: SparkContext): Unit = USignaling.register("INT") { + def cancelOnInterrupt(ctx: SparkContext): Unit = SignalUtils.register("INT") { if (!ctx.statusTracker.getActiveJobIds().isEmpty) { logWarning("Cancelling all active jobs, this can take a while. " + "Press Ctrl+C again to exit now.") From fa1c83a61e7b31bf225d17ca479949b479a7176a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 22 Apr 2016 00:25:01 -0700 Subject: [PATCH 3/4] minor update --- core/src/main/scala/org/apache/spark/util/SignalUtils.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala index b35f5be28aff6..e71f85a1324ed 100644 --- a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala @@ -60,9 +60,8 @@ private[spark] object SignalUtils extends Logging { if (SystemUtils.IS_OS_UNIX) { try { val handler = handlers.getOrElseUpdate(signal, { - val h = new ActionHandler(new Signal(signal)) logInfo("Registered signal handler for " + signal) - h + new ActionHandler(new Signal(signal)) }) handler.register(action) } catch { From 9a8f93c5592c8bed74a70d3eb7f3773966361567 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 22 Apr 2016 01:01:33 -0700 Subject: [PATCH 4/4] code review --- .../org/apache/spark/util/SignalUtils.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala index e71f85a1324ed..9479d8f74dd25 100644 --- a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala @@ -27,24 +27,24 @@ import sun.misc.{Signal, SignalHandler} import org.apache.spark.internal.Logging - /** * Contains utilities for working with posix signals. */ private[spark] object SignalUtils extends Logging { - private var registered = false + /** A flag to make sure we only register the logger once. */ + private var loggerRegistered = false /** Register a signal handler to log signals on UNIX-like systems. */ def registerLogger(log: Logger): Unit = synchronized { - if (!registered) { + if (!loggerRegistered) { Seq("TERM", "HUP", "INT").foreach { sig => SignalUtils.register(sig) { log.error("RECEIVED SIGNAL " + sig) false } } - registered = true + loggerRegistered = true } } @@ -75,6 +75,10 @@ private[spark] object SignalUtils extends Logging { */ private class ActionHandler(signal: Signal) extends SignalHandler { + /** + * List of actions upon the signal; the callbacks should return true if the signal is "handled", + * i.e. should not escalate to the next callback. + */ private val actions = Collections.synchronizedList(new java.util.LinkedList[() => Boolean]) // original signal handler, before this handler was attached @@ -88,6 +92,8 @@ private[spark] object SignalUtils extends Logging { // register old handler, will receive incoming signals while this handler is running Signal.handle(signal, prevHandler) + // run all actions, escalate to parent handler if no action catches the signal + // (i.e. all actions return false) val escalate = actions.asScala.forall { action => !action() } if (escalate) { prevHandler.handle(sig) @@ -98,13 +104,13 @@ private[spark] object SignalUtils extends Logging { } /** - * Add an action to be run by this handler. + * Adds an action to be run by this handler. * @param action An action to be run when a signal is received. Return true if the signal - * should be stopped with this handler, false if it should be escalated. + * should be stopped with this handler, false if it should be escalated. */ def register(action: => Boolean): Unit = actions.add(() => action) } - // contains association of signals to their respective handlers + /** Mapping from signal to their respective handlers. */ private val handlers = new scala.collection.mutable.HashMap[String, ActionHandler] }