Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 0 additions & 36 deletions core/src/main/scala/org/apache/spark/util/SignalLogger.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,69 @@

package org.apache.spark.util

import java.util.{Collections, LinkedList}
import java.util.Collections

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap

import org.apache.commons.lang3.SystemUtils
import org.slf4j.Logger
import sun.misc.{Signal, SignalHandler}

import org.apache.spark.internal.Logging


/**
* Contains utilities for working with posix signals.
*/
private[spark] object Signaling extends Logging {
private[spark] object SignalUtils extends Logging {

/** A flag to make sure we only register the logger once. */
private var loggerRegistered = false

/** Register a signal handler to log signals on UNIX-like systems. */
def registerLogger(log: Logger): Unit = synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I specifically kept logging separate as it is a specific use case of signaling, whereas my original Signaling object just provided the base functionality of registering arbitrary functions to be called on signals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea but it doesn't make sense to have that class there hanging by itself with a tiny bit of functionality

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it would be clearer to keep distinct functionality in distinct files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are they distinct? they both just does something with signals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think about it for a second: with your definition we should probably have ~ 1 million files in spark, since every function is doing something slightly different.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah but one provides the ability (and is used from other places too), the other uses it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you do know that statement applies to almost all the code in spark right? :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's true :) I think it's just a matter of preference and what you consider is distinct functionality (I actually started by removing logging but then re-amending it after second thought). I agree that both classes provide only utilities so it's really a minor issue.
My only concern is the use of the registered variable (see below)

if (!loggerRegistered) {
Seq("TERM", "HUP", "INT").foreach { sig =>
SignalUtils.register(sig) {
log.error("RECEIVED SIGNAL " + sig)
false
}
}
loggerRegistered = true
}
}

/**
* Adds an action to be run when a given signal is received by this process.
*
* Note that signals are only supported on unix-like operating systems and work on a best-effort
* basis: if a signal is not available or cannot be intercepted, only a warning is emitted.
*
* All actions for a given signal are run in a separate thread.
*/
def register(signal: String)(action: => Boolean): Unit = synchronized {
if (SystemUtils.IS_OS_UNIX) {
try {
val handler = handlers.getOrElseUpdate(signal, {
logInfo("Registered signal handler for " + signal)
new ActionHandler(new Signal(signal))
})
handler.register(action)
} catch {
case ex: Exception => logWarning(s"Failed to register signal handler for " + signal, ex)
}
}
}

/**
* A handler for the given signal that runs a collection of actions.
*/
private class ActionHandler(signal: Signal) extends SignalHandler {

private val actions = Collections.synchronizedList(new LinkedList[() => Boolean])
/**
* List of actions upon the signal; the callbacks should return true if the signal is "handled",
* i.e. should not escalate to the next callback.
*/
private val actions = Collections.synchronizedList(new java.util.LinkedList[() => Boolean])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it preferred to use fully qualified names when using java collections?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just gets confusing when we mix scala and java collections so i thought i would just fully qualify them.


// original signal handler, before this handler was attached
private val prevHandler: SignalHandler = Signal.handle(signal, this)
Expand All @@ -51,11 +92,10 @@ private[spark] object Signaling extends Logging {
// register old handler, will receive incoming signals while this handler is running
Signal.handle(signal, prevHandler)

val escalate = actions.asScala forall { action =>
!action()
}

if(escalate) {
// run all actions, escalate to parent handler if no action catches the signal
// (i.e. all actions return false)
val escalate = actions.asScala.forall { action => !action() }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you write a short description here on what this is doing? it's not very straightforward (especially the forall and boolean inversion)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I initially had comments but thought they were overkill since the behaviour can be deducted from the comments in the register() method. I'll add some more info

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how should I add the comments? Just post them here and you'll update the PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

Copy link
Member

@jodersky jodersky Apr 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/**
     * Called when this handler's signal is received. Note that if the same signal is received
     * before this method returns, it is escalated to the previous handler.
     */
    override def handle(sig: Signal): Unit = {
      // register old handler, will receive incoming signals while this handler is running
      Signal.handle(signal, prevHandler)

      // run all actions, escalate to parent handler if no action says the signal was caught
      // (i.e. all actions return false) 
      val escalate = actions.asScala.forall { action => !action() }
      if (escalate) {
        prevHandler.handle(sig)
      }

      // re-register this handler
      Signal.handle(signal, this)
    }

is this better? I can add even more if you prefer

if (escalate) {
prevHandler.handle(sig)
}

Expand All @@ -64,36 +104,13 @@ private[spark] object Signaling extends Logging {
}

/**
* Add an action to be run by this handler.
* Adds an action to be run by this handler.
* @param action An action to be run when a signal is received. Return true if the signal
* should be stopped with this handler, false if it should be escalated.
* should be stopped with this handler, false if it should be escalated.
*/
def register(action: => Boolean): Unit = actions.add(() => action)

}

// contains association of signals to their respective handlers
private val handlers = new HashMap[String, ActionHandler]

/**
* Adds an action to be run when a given signal is received by this process.
*
* Note that signals are only supported on unix-like operating systems and work on a best-effort
* basis: if a signal is not available or cannot be intercepted, only a warning is emitted.
*
* All actions for a given signal are run in a separate thread.
*/
def register(signal: String)(action: => Boolean): Unit = synchronized {
if (SystemUtils.IS_OS_UNIX) try {
val handler = handlers.getOrElseUpdate(signal, {
val h = new ActionHandler(new Signal(signal))
logInfo("Registered signal handler for " + signal)
h
})
handler.register(action)
} catch {
case ex: Exception => logWarning(s"Failed to register signal handler for " + signal, ex)
}
}

/** Mapping from signal to their respective handlers. */
private val handlers = new scala.collection.mutable.HashMap[String, ActionHandler]
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2284,7 +2284,7 @@ private[spark] object Utils extends Logging {
*/
def initDaemon(log: Logger): Unit = {
log.info(s"Started daemon with process name: ${Utils.getProcessName()}")
SignalLogger.register(log)
SignalUtils.registerLogger(log)
}
}

Expand Down
4 changes: 2 additions & 2 deletions repl/src/main/scala/org/apache/spark/repl/Signaling.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.repl

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.util.{Signaling => USignaling}
import org.apache.spark.util.SignalUtils

private[repl] object Signaling extends Logging {

Expand All @@ -28,7 +28,7 @@ private[repl] object Signaling extends Logging {
* when no jobs are currently running.
* This makes it possible to interrupt a running shell job by pressing Ctrl+C.
*/
def cancelOnInterrupt(ctx: SparkContext): Unit = USignaling.register("INT") {
def cancelOnInterrupt(ctx: SparkContext): Unit = SignalUtils.register("INT") {
if (!ctx.statusTracker.getActiveJobIds().isEmpty) {
logWarning("Cancelling all active jobs, this can take a while. " +
"Press Ctrl+C again to exit now.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ object ApplicationMaster extends Logging {
private var master: ApplicationMaster = _

def main(args: Array[String]): Unit = {
SignalLogger.register(log)
SignalUtils.registerLogger(log)
val amArgs = new ApplicationMasterArguments(args)
SparkHadoopUtil.get.runAsSparkUser { () =>
master = new ApplicationMaster(amArgs, new YarnRMClient)
Expand Down