From 4cb9fd6abb61e841d1014273a3b8fe95e4dfd184 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Tue, 22 Apr 2014 16:57:13 -0700 Subject: [PATCH 1/4] SPARK-1582 Invoke Thread.interrupt() when cancelling jobs Sometimes executor threads are blocked waiting for IO or monitors, and the current implementation of job killing may never recover these threads. By simply invoking Thread.interrupt() during cancellation, we can often safely unblock the threads and use them for subsequent work. Note that this feature must remain optional for now because of a bug in HDFS where Thread.interrupt() may cause nodes to be marked as permanently dead (as the InterruptedException is reinterpreted as an IOException during communication with some node). --- .../main/scala/org/apache/spark/SparkContext.scala | 11 ++++++++++- .../org/apache/spark/api/python/PythonRDD.scala | 1 - .../executor/CoarseGrainedExecutorBackend.scala | 4 ++-- .../scala/org/apache/spark/executor/Executor.scala | 10 +++++----- .../apache/spark/executor/MesosExecutorBackend.scala | 3 ++- .../org/apache/spark/scheduler/DAGScheduler.scala | 5 ++++- .../apache/spark/scheduler/SchedulerBackend.scala | 3 ++- .../main/scala/org/apache/spark/scheduler/Task.scala | 12 ++++++++++-- .../org/apache/spark/scheduler/TaskScheduler.scala | 2 +- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 4 ++-- .../scala/org/apache/spark/scheduler/TaskSet.scala | 4 ++-- .../cluster/CoarseGrainedClusterMessage.scala | 3 ++- .../cluster/CoarseGrainedSchedulerBackend.scala | 8 ++++---- .../apache/spark/scheduler/local/LocalBackend.scala | 10 +++++----- 14 files changed, 51 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 25ca650a3a37e..ce66685e85710 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -381,16 +381,23 @@ class SparkContext(config: SparkConf) extends Logging { * // In a separate thread: * sc.cancelJobGroup("some_job_to_cancel") * }}} + * + * If interruptOnCancel is set to true for the job group, then job cancellation will result + * in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure + * that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208, + * where HDFS may respond to Thread.interrupt() by marking nodes as dead. */ - def setJobGroup(groupId: String, description: String) { + def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false) { setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description) setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId) + setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, interruptOnCancel.toString) } /** Clear the current thread's job group ID and its description. */ def clearJobGroup() { setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null) setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null) + setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null) } // Post init @@ -1244,6 +1251,8 @@ object SparkContext extends Logging { private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id" + private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel" + private[spark] val SPARK_UNKNOWN_USER = "" implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index f9d86fed34d0f..d7ffbf70e40dc 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -162,7 +162,6 @@ private[spark] class PythonRDD[T: ClassTag]( val update = new Array[Byte](updateLen) stream.readFully(update) accumulator += Collections.singletonList(update) - } Array.empty[Byte] } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 6327ac01663f6..9ac7365f47f9f 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -69,12 +69,12 @@ private[spark] class CoarseGrainedExecutorBackend( executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) } - case KillTask(taskId, _) => + case KillTask(taskId, _, interruptThread) => if (executor == null) { logError("Received KillTask command but executor was null") System.exit(1) } else { - executor.killTask(taskId) + executor.killTask(taskId, interruptThread) } case x: DisassociatedEvent => diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 2bfb9c387e1c9..914bc205cebe2 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -136,10 +136,10 @@ private[spark] class Executor( threadPool.execute(tr) } - def killTask(taskId: Long) { + def killTask(taskId: Long, interruptThread: Boolean) { val tr = runningTasks.get(taskId) if (tr != null) { - tr.kill() + tr.kill(interruptThread) } } @@ -166,11 +166,11 @@ private[spark] class Executor( @volatile private var killed = false @volatile private var task: Task[Any] = _ - def kill() { + def kill(interruptThread: Boolean) { logInfo("Executor is trying to kill task " + taskId) killed = true if (task != null) { - task.kill() + task.kill(interruptThread) } } @@ -257,7 +257,7 @@ private[spark] class Executor( execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) } - case TaskKilledException => { + case TaskKilledException | _: InterruptedException if task.killed => { logInfo("Executor killed task " + taskId) execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) } diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index 6fc702fdb1512..64e24506e8038 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -76,7 +76,8 @@ private[spark] class MesosExecutorBackend if (executor == null) { logError("Received KillTask but executor was null") } else { - executor.killTask(t.getValue.toLong) + // TODO: Determine the 'interruptOnCancel' property set for the given job. + executor.killTask(t.getValue.toLong, interruptThread = false) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c6cbf14e20069..bee7f1be5d97e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1055,6 +1055,9 @@ class DAGScheduler( val error = new SparkException(failureReason) job.listener.jobFailed(error) + val shouldInterruptThread = + job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean + // Cancel all independent, running stages. val stages = jobIdToStageIds(job.jobId) if (stages.isEmpty) { @@ -1073,7 +1076,7 @@ class DAGScheduler( // This is the only job that uses this stage, so fail the stage if it is running. val stage = stageIdToStage(stageId) if (runningStages.contains(stage)) { - taskScheduler.cancelTasks(stageId) + taskScheduler.cancelTasks(stageId, shouldInterruptThread) val stageInfo = stageToInfos(stage) stageInfo.stageFailed(failureReason) listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index f1924a4573b21..6a6d8e609bc39 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -28,5 +28,6 @@ private[spark] trait SchedulerBackend { def reviveOffers(): Unit def defaultParallelism(): Int - def killTask(taskId: Long, executorId: String): Unit = throw new UnsupportedOperationException + def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = + throw new UnsupportedOperationException } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index a8bcb7dfe2f3c..2ca3479c80efc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -44,8 +44,9 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex final def run(attemptId: Long): T = { context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false) + taskThread = Thread.currentThread() if (_killed) { - kill() + kill(interruptThread = false) } runTask(context) } @@ -62,6 +63,9 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex // Task context, to be initialized in run(). @transient protected var context: TaskContext = _ + // The actual Thread on which the task is running, if any. Initialized in run(). + @volatile @transient private var taskThread: Thread = _ + // A flag to indicate whether the task is killed. This is used in case context is not yet // initialized when kill() is invoked. @volatile @transient private var _killed = false @@ -75,12 +79,16 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex * Kills a task by setting the interrupted flag to true. This relies on the upper level Spark * code and user code to properly handle the flag. This function should be idempotent so it can * be called multiple times. + * If interruptThread is true, we will also call Thread.interrupt() on the Task's executor thread. */ - def kill() { + def kill(interruptThread: Boolean) { _killed = true if (context != null) { context.interrupted = true } + if (interruptThread && taskThread != null) { + taskThread.interrupt() + } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 92616c997e20c..819c35257b5a7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -47,7 +47,7 @@ private[spark] trait TaskScheduler { def submitTasks(taskSet: TaskSet): Unit // Cancel a stage. - def cancelTasks(stageId: Int) + def cancelTasks(stageId: Int, interruptThread: Boolean) // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called. def setDAGScheduler(dagScheduler: DAGScheduler): Unit diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index fe72ab3e43146..be19d9b8854c8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -170,7 +170,7 @@ private[spark] class TaskSchedulerImpl( backend.reviveOffers() } - override def cancelTasks(stageId: Int): Unit = synchronized { + override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized { logInfo("Cancelling stage " + stageId) activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) => // There are two possible cases here: @@ -181,7 +181,7 @@ private[spark] class TaskSchedulerImpl( // simply abort the stage. tsm.runningTasksSet.foreach { tid => val execId = taskIdToExecutorId(tid) - backend.killTask(tid, execId) + backend.killTask(tid, execId, interruptThread) } tsm.abort("Stage %s cancelled".format(stageId)) logInfo("Stage %d was cancelled".format(stageId)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala index 03bf76083761f..613fa7850bb25 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala @@ -31,8 +31,8 @@ private[spark] class TaskSet( val properties: Properties) { val id: String = stageId + "." + attempt - def kill() { - tasks.foreach(_.kill()) + def kill(interruptThread: Boolean) { + tasks.foreach(_.kill(interruptThread)) } override def toString: String = "TaskSet " + id diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 4a9a1659d8254..ddbc74e82ac49 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -30,7 +30,8 @@ private[spark] object CoarseGrainedClusterMessages { // Driver to executors case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage - case class KillTask(taskId: Long, executor: String) extends CoarseGrainedClusterMessage + case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) + extends CoarseGrainedClusterMessage case class RegisteredExecutor(sparkProperties: Seq[(String, String)]) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7bfc30b4208a3..a6d6b3d26a3c6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -101,8 +101,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A case ReviveOffers => makeOffers() - case KillTask(taskId, executorId) => - executorActor(executorId) ! KillTask(taskId, executorId) + case KillTask(taskId, executorId, interruptThread) => + executorActor(executorId) ! KillTask(taskId, executorId, interruptThread) case StopDriver => sender ! true @@ -207,8 +207,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A driverActor ! ReviveOffers } - override def killTask(taskId: Long, executorId: String) { - driverActor ! KillTask(taskId, executorId) + override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) { + driverActor ! KillTask(taskId, executorId, interruptThread) } override def defaultParallelism(): Int = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 16e2f5cf3076d..43f0e18a0cbe0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -30,7 +30,7 @@ private case class ReviveOffers() private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) -private case class KillTask(taskId: Long) +private case class KillTask(taskId: Long, interruptThread: Boolean) /** * Calls to LocalBackend are all serialized through LocalActor. Using an actor makes the calls on @@ -61,8 +61,8 @@ private[spark] class LocalActor( reviveOffers() } - case KillTask(taskId) => - executor.killTask(taskId) + case KillTask(taskId, interruptThread) => + executor.killTask(taskId, interruptThread) } def reviveOffers() { @@ -99,8 +99,8 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: override def defaultParallelism() = totalCores - override def killTask(taskId: Long, executorId: String) { - localActor ! KillTask(taskId) + override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) { + localActor ! KillTask(taskId, interruptThread) } override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) { From b67f472246af1852b8c715c001190408c02f5681 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Wed, 23 Apr 2014 13:06:45 -0700 Subject: [PATCH 2/4] Add comment on why interruptOnCancel is in setJobGroup --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ce66685e85710..489546d07aea3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -390,6 +390,10 @@ class SparkContext(config: SparkConf) extends Logging { def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false) { setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description) setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId) + // Note: Specifying interruptOnCancel in setJobGroup (rather than cancelJobGroup) avoids + // changing several public APIs and allows Spark cancellations outside of the cancelJobGroup + // APIs to also take advantage of this property (e.g., internal job failures or canceling from + // JobProgressTab UI) on a per-job basis. setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, interruptOnCancel.toString) } From 82f78bb67681c3886172e3919f949ccc560880fc Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Wed, 23 Apr 2014 13:49:41 -0700 Subject: [PATCH 3/4] Update DAGSchedulerSuite --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 35a7ac9d049c2..ff69eb7e53f8e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -58,7 +58,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) taskSets += taskSet } - override def cancelTasks(stageId: Int) { + override def cancelTasks(stageId: Int, interruptThread: Boolean) { cancelledStages += stageId } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} From e52b82904d6f29adac343f672a1a4621b9d27a23 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Wed, 23 Apr 2014 15:21:20 -0700 Subject: [PATCH 4/4] Don't use job.properties when null --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index bee7f1be5d97e..dbde9b591dccc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1056,7 +1056,8 @@ class DAGScheduler( job.listener.jobFailed(error) val shouldInterruptThread = - job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean + if (job.properties == null) false + else job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean // Cancel all independent, running stages. val stages = jobIdToStageIds(job.jobId)