From 31f12188c6d217cac8dd6a0767433091b153a38d Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Thu, 14 Jul 2016 07:36:36 -0700 Subject: [PATCH 1/3] CoarseGrainedExecutorBackend to self kill if there is an exception while creating an Executor --- .../CoarseGrainedExecutorBackend.scala | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index ccc6c36e9c79a..1cdb1df22b6d3 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.util.{Failure, Success} +import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.TaskState.TaskState @@ -64,8 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend( case Success(msg) => // Always receive `true`. Just ignore it case Failure(e) => - logError(s"Cannot register with driver: $driverUrl", e) - exitExecutor(1) + exitExecutor(1, s"Cannot register with driver: $driverUrl", e) }(ThreadUtils.sameThread) } @@ -78,16 +78,19 @@ private[spark] class CoarseGrainedExecutorBackend( override def receive: PartialFunction[Any, Unit] = { case RegisteredExecutor => logInfo("Successfully registered with driver") - executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) + try { + executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) + } catch { + case NonFatal(e) => + exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) + } case RegisterExecutorFailed(message) => - logError("Slave registration failed: " + message) - exitExecutor(1) + exitExecutor(1, "Slave registration failed: " + message) case LaunchTask(data) => if (executor == null) { - logError("Received LaunchTask command but executor was null") - exitExecutor(1) + exitExecutor(1, "Received LaunchTask command but executor was null") } else { val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) @@ -97,8 +100,7 @@ private[spark] class CoarseGrainedExecutorBackend( case KillTask(taskId, _, interruptThread) => if (executor == null) { - logError("Received KillTask command but executor was null") - exitExecutor(1) + exitExecutor(1, "Received KillTask command but executor was null") } else { executor.killTask(taskId, interruptThread) } @@ -127,8 +129,7 @@ private[spark] class CoarseGrainedExecutorBackend( if (stopping.get()) { logInfo(s"Driver from $remoteAddress disconnected during shutdown") } else if (driver.exists(_.address == remoteAddress)) { - logError(s"Driver $remoteAddress disassociated! Shutting down.") - exitExecutor(1) + exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.") } else { logWarning(s"An unknown ($remoteAddress) driver disconnected.") } @@ -147,7 +148,10 @@ private[spark] class CoarseGrainedExecutorBackend( * executor exits differently. For e.g. when an executor goes down, * back-end may not want to take the parent process down. */ - protected def exitExecutor(code: Int): Unit = System.exit(code) + protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = { + logError(reason, throwable) + System.exit(code) + } } private[spark] object CoarseGrainedExecutorBackend extends Logging { From 54990718ec5ca6dbf3377c3630120fcfcb98c1a3 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Fri, 15 Jul 2016 10:29:36 -0700 Subject: [PATCH 2/3] review comment --- .../spark/executor/CoarseGrainedExecutorBackend.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 1cdb1df22b6d3..1c19f1f3da9e0 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -149,7 +149,11 @@ private[spark] class CoarseGrainedExecutorBackend( * back-end may not want to take the parent process down. */ protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = { - logError(reason, throwable) + if (throwable == null) { + logError(reason) + } else { + logError(reason, throwable) + } System.exit(code) } } From a4f80f58694127cab846803b8162cfb0a50b7026 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Fri, 15 Jul 2016 10:31:26 -0700 Subject: [PATCH 3/3] changed order --- .../spark/executor/CoarseGrainedExecutorBackend.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 1c19f1f3da9e0..e30839c49c04f 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -149,10 +149,10 @@ private[spark] class CoarseGrainedExecutorBackend( * back-end may not want to take the parent process down. */ protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = { - if (throwable == null) { - logError(reason) - } else { + if (throwable != null) { logError(reason, throwable) + } else { + logError(reason) } System.exit(code) }