From 66351191cae541faaa5850a72d1778b0fdb7eca1 Mon Sep 17 00:00:00 2001 From: Subroto Sanyal Date: Tue, 14 Jun 2016 10:04:34 +0200 Subject: [PATCH 1/2] SPARK-15937 The method waitForSparkContextInitialized() and runDriver() will taken into account if the job has been finished or not already before declaring the job to be failed. --- .../spark/deploy/yarn/ApplicationMaster.scala | 47 +++++++++++-------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 4df90d7b6b0b8..49b900414a023 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -384,19 +384,23 @@ private[spark] class ApplicationMaster( // been set by the Thread executing the user class. val sc = waitForSparkContextInitialized() - // If there is no SparkContext at this point, just fail the app. - if (sc == null) { - finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_SC_NOT_INITED, - "Timed out waiting for SparkContext.") - } else { - rpcEnv = sc.env.rpcEnv - val driverRef = runAMEndpoint( - sc.getConf.get("spark.driver.host"), - sc.getConf.get("spark.driver.port"), - isClusterMode = true) - registerAM(rpcEnv, driverRef, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr) - userClassThread.join() + if (!finished) { + // If there is no SparkContext at this point, just fail the app. + if (!sc.isDefined) { + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_SC_NOT_INITED, + "Timed out waiting for SparkContext.") + } else { + val sparkContext = sc.get + rpcEnv = sparkContext.env.rpcEnv + val driverRef = runAMEndpoint( + sparkContext.getConf.get("spark.driver.host"), + sparkContext.getConf.get("spark.driver.port"), + isClusterMode = true) + registerAM(rpcEnv, driverRef, sparkContext.ui.map(_.appUIAddress).getOrElse(""), + securityMgr) + userClassThread.join() + } } } @@ -503,7 +507,7 @@ private[spark] class ApplicationMaster( } } - private def waitForSparkContextInitialized(): SparkContext = { + private def waitForSparkContextInitialized(): Option[SparkContext] = { logInfo("Waiting for spark context initialization") sparkContextRef.synchronized { val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) @@ -513,13 +517,16 @@ private[spark] class ApplicationMaster( logInfo("Waiting for spark context initialization ... ") sparkContextRef.wait(10000L) } - - val sparkContext = sparkContextRef.get() - if (sparkContext == null) { - logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier" - + " log output for errors. Failing the application.").format(totalWaitTime)) + if (!finished) { + val sparkContext = sparkContextRef.get() + if (sparkContext == null) { + logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier" + + " log output for errors. Failing the application.").format(totalWaitTime)) + } else { + Some(sparkContext) + } } - sparkContext + None } } From c0ec676f130a74d4a3b4edb7b1e8ba8dd447593f Mon Sep 17 00:00:00 2001 From: Subroto Sanyal Date: Tue, 14 Jun 2016 19:57:58 +0200 Subject: [PATCH 2/2] SPARK-15937 The method waitForSparkContextInitialized() and runDriver() will taken into account if the job has been finished or not already before declaring the job to be failed. --- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 49b900414a023..7c0b30997ecde 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -523,7 +523,7 @@ private[spark] class ApplicationMaster( logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier" + " log output for errors. Failing the application.").format(totalWaitTime)) } else { - Some(sparkContext) + return Some(sparkContext) } } None