From 8f6283a7c407d28c043523d91b8c3a24da0eff52 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 4 Apr 2017 16:52:51 -0700 Subject: [PATCH 1/4] Tue Apr 4 16:52:51 PDT 2017 --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 99b1608010ddb..6dafd95a5b486 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -432,7 +432,7 @@ private[spark] class Executor( setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason))) - case _: InterruptedException if task.reasonIfKilled.isDefined => + case _: Throwable if task.reasonIfKilled.isDefined => val killReason = task.reasonIfKilled.getOrElse("unknown reason") logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") setTaskFinishedAndClearInterruptStatus() From 9d59960626178acb68918f1fce1a4f85b0da7493 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 4 Apr 2017 17:04:06 -0700 Subject: [PATCH 2/4] Tue Apr 4 17:04:06 PDT 2017 --- .../test/scala/org/apache/spark/SparkContextSuite.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 2c947556dfd30..735f4454e299e 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -572,7 +572,13 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu // first attempt will hang if (!SparkContextSuite.isTaskStarted) { SparkContextSuite.isTaskStarted = true - Thread.sleep(9999999) + try { + Thread.sleep(9999999) + } catch { + case t: Throwable => + // SPARK-20217 should not fail stage if task throws non-interrupted exception + throw new RuntimeException("killed") + } } // second attempt succeeds immediately } From 28ebc94d0e79bad77128a2ae6a8701f4297e1e75 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 5 Apr 2017 11:50:42 -0700 Subject: [PATCH 3/4] Wed Apr 5 11:50:42 PDT 2017 --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 6dafd95a5b486..7db4f986ed2a6 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -432,7 +432,7 @@ private[spark] class Executor( setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason))) - case _: Throwable if task.reasonIfKilled.isDefined => + case NonFatal(_) if task.reasonIfKilled.isDefined => val killReason = task.reasonIfKilled.getOrElse("unknown reason") logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") setTaskFinishedAndClearInterruptStatus() From 42f49f21f20e82cc64d346b163523a9d17ae7bf8 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 5 Apr 2017 11:58:53 -0700 Subject: [PATCH 4/4] add null check for task --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 7db4f986ed2a6..83469c5ff0600 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -432,7 +432,7 @@ private[spark] class Executor( setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason))) - case NonFatal(_) if task.reasonIfKilled.isDefined => + case NonFatal(_) if task != null && task.reasonIfKilled.isDefined => val killReason = task.reasonIfKilled.getOrElse("unknown reason") logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") setTaskFinishedAndClearInterruptStatus()