From 8719ee480fe00ecaa85f3f08c8a8bc578a226bcc Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 13 Nov 2015 13:47:46 +0800 Subject: [PATCH 1/4] Fix explicitly killing executor dies silently issue --- .../spark/scheduler/TaskSchedulerImpl.scala | 1 + .../CoarseGrainedSchedulerBackend.scala | 5 ++++- .../spark/deploy/yarn/YarnAllocator.scala | 22 +++++++++++++++++-- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 43d7d80b7aae1..5f136690f456c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -473,6 +473,7 @@ private[spark] class TaskSchedulerImpl( // If the host mapping still exists, it means we don't know the loss reason for the // executor. So call removeExecutor() to update tasks running on that executor when // the real loss reason is finally known. + logError(s"Actual reason for lost executor $executorId: ${reason.message}") removeExecutor(executorId, reason) case None => diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f71d98feac050..ce19ed19c5b1a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -269,13 +269,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Stop making resource offers for the given executor. The executor is marked as lost with * the loss reason still pending. * - * @return Whether executor was alive. + * @return Whether executor should be disabled */ protected def disableExecutor(executorId: String): Boolean = { val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { if (executorIsAlive(executorId)) { executorsPendingLossReason += executorId true + } else if (executorsPendingToRemove.contains(executorId)) { + // Returns true for explicitly killed executors, we also need to get pending loss reasons + true } else { false } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 4d9e777cb4134..602f6ccdf3924 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -96,6 +96,11 @@ private[yarn] class YarnAllocator( // was lost. private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]] + // Executor loss reason for explicitly released executors, it will be added when executor loss + // reason is got from AM-RM call, and be removed after query this loss reason. + private val releasedExecutorLossReasons = + new ConcurrentHashMap[String, ExecutorLossReason] + // Keep track of which container is running which executor to remove the executors later // Visible for testing. private[yarn] val executorIdToContainer = new HashMap[String, Container] @@ -202,8 +207,7 @@ private[yarn] class YarnAllocator( */ def killExecutor(executorId: String): Unit = synchronized { if (executorIdToContainer.contains(executorId)) { - val container = executorIdToContainer.remove(executorId).get - containerIdToExecutorId.remove(container.getId) + val container = executorIdToContainer.get(executorId).get internalReleaseContainer(container) numExecutorsRunning -= 1 } else { @@ -498,6 +502,8 @@ private[yarn] class YarnAllocator( s"Container $containerId exited from explicit termination request.") } + logInfo(s">>>>>> executor exited: $exitReason") + for { host <- hostOpt containerSet <- allocatedHostToContainersMap.get(host) @@ -523,6 +529,13 @@ private[yarn] class YarnAllocator( // Notify backend about the failure of the executor numUnexpectedContainerRelease += 1 driverRef.send(RemoveExecutor(eid, exitReason)) + } else { + // The executor exit is explicit termination request. The reason why we have to store the + // exit reason is that killing request may return completed containers in one AM-RM + // round-trip communication. So if we do not store such information, query again in + // another RPC call from driver to AM cannot get this information. + releasedExecutorLossReasons.put(eid, exitReason) + } } } @@ -538,6 +551,11 @@ private[yarn] class YarnAllocator( if (executorIdToContainer.contains(eid)) { pendingLossReasonRequests .getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context + } else if (releasedExecutorLossReasons.get(eid) != null) { + // Executor is already released explicitly before getting the loss reason, so directly send + // the pre-stored lost reason + context.reply(releasedExecutorLossReasons.get(eid)) + releasedExecutorLossReasons.remove(eid) } else { logWarning(s"Tried to get the loss reason for non-existent executor $eid") } From 85d18d2dcec4d6d46bddd7b4bf6ad264e40d802a Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 13 Nov 2015 14:00:11 +0800 Subject: [PATCH 2/4] Remove unnecessary log --- .../main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 602f6ccdf3924..1c967f0de85f7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -502,8 +502,6 @@ private[yarn] class YarnAllocator( s"Container $containerId exited from explicit termination request.") } - logInfo(s">>>>>> executor exited: $exitReason") - for { host <- hostOpt containerSet <- allocatedHostToContainersMap.get(host) From 0ae6613af3cbd435559d871792ebfdf1ba867cf3 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 13 Nov 2015 16:33:00 +0800 Subject: [PATCH 3/4] refactor the codes --- .../spark/deploy/yarn/YarnAllocator.scala | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 1c967f0de85f7..4c97ef6b2224b 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -518,22 +518,24 @@ private[yarn] class YarnAllocator( containerIdToExecutorId.remove(containerId).foreach { eid => executorIdToContainer.remove(eid) - pendingLossReasonRequests.remove(eid).foreach { pendingRequests => - // Notify application of executor loss reasons so it can decide whether it should abort - pendingRequests.foreach(_.reply(exitReason)) + pendingLossReasonRequests.remove(eid) match { + case Some(pendingRequests) => + // Notify application of executor loss reasons so it can decide whether it should abort + pendingRequests.foreach(_.reply(exitReason)) + + case None => + // We cannot find executor for pending reasons. This is because completed container + // process is before than querying pending result. We should store it for later query. + // This is usually happened when explicitly killing a container, the result will be + // returned in one AM-RM communication. So query RPC will be later than this completed + // container process. + releasedExecutorLossReasons.put(eid, exitReason) } if (!alreadyReleased) { // The executor could have gone away (like no route to host, node failure, etc) // Notify backend about the failure of the executor numUnexpectedContainerRelease += 1 driverRef.send(RemoveExecutor(eid, exitReason)) - } else { - // The executor exit is explicit termination request. The reason why we have to store the - // exit reason is that killing request may return completed containers in one AM-RM - // round-trip communication. So if we do not store such information, query again in - // another RPC call from driver to AM cannot get this information. - releasedExecutorLossReasons.put(eid, exitReason) - } } } From 53477342d9fadc9f1da365805b81688f9b8ee5bd Mon Sep 17 00:00:00 2001 From: jerryshao Date: Sun, 15 Nov 2015 16:42:29 +0800 Subject: [PATCH 4/4] Address the comments --- .../CoarseGrainedSchedulerBackend.scala | 7 +++---- .../spark/deploy/yarn/YarnAllocator.scala | 18 +++++++++--------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ce19ed19c5b1a..3373caf0d15eb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -276,11 +276,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (executorIsAlive(executorId)) { executorsPendingLossReason += executorId true - } else if (executorsPendingToRemove.contains(executorId)) { - // Returns true for explicitly killed executors, we also need to get pending loss reasons - true } else { - false + // Returns true for explicitly killed executors, we also need to get pending loss reasons; + // For others return false. + executorsPendingToRemove.contains(executorId) } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 4c97ef6b2224b..7e39c3ea56af3 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.util.RackResolver import org.apache.log4j.{Level, Logger} -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} @@ -96,10 +96,9 @@ private[yarn] class YarnAllocator( // was lost. private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]] - // Executor loss reason for explicitly released executors, it will be added when executor loss - // reason is got from AM-RM call, and be removed after query this loss reason. - private val releasedExecutorLossReasons = - new ConcurrentHashMap[String, ExecutorLossReason] + // Maintain loss reasons for already released executors, it will be added when executor loss + // reason is got from AM-RM call, and be removed after querying this loss reason. + private val releasedExecutorLossReasons = new HashMap[String, ExecutorLossReason] // Keep track of which container is running which executor to remove the executors later // Visible for testing. @@ -525,7 +524,7 @@ private[yarn] class YarnAllocator( case None => // We cannot find executor for pending reasons. This is because completed container - // process is before than querying pending result. We should store it for later query. + // is processed before querying pending result. We should store it for later query. // This is usually happened when explicitly killing a container, the result will be // returned in one AM-RM communication. So query RPC will be later than this completed // container process. @@ -551,13 +550,14 @@ private[yarn] class YarnAllocator( if (executorIdToContainer.contains(eid)) { pendingLossReasonRequests .getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context - } else if (releasedExecutorLossReasons.get(eid) != null) { + } else if (releasedExecutorLossReasons.contains(eid)) { // Executor is already released explicitly before getting the loss reason, so directly send // the pre-stored lost reason - context.reply(releasedExecutorLossReasons.get(eid)) - releasedExecutorLossReasons.remove(eid) + context.reply(releasedExecutorLossReasons.remove(eid).get) } else { logWarning(s"Tried to get the loss reason for non-existent executor $eid") + context.sendFailure( + new SparkException(s"Fail to find loss reason for non-existent executor $eid")) } }