From 7484b9dd0b31fd10ae3ea7973579eec0d6d7e8a7 Mon Sep 17 00:00:00 2001 From: twinkle sachdeva Date: Mon, 2 Feb 2015 21:08:35 +0530 Subject: [PATCH 1/4] SPARK-4705: Doing cherry-pick of fix into master --- .../scala/org/apache/spark/SparkContext.scala | 4 ++- .../scheduler/EventLoggingListener.scala | 26 +++++++++++++++++-- .../spark/scheduler/SchedulerBackend.scala | 7 +++++ .../spark/scheduler/TaskScheduler.scala | 8 ++++++ .../spark/scheduler/TaskSchedulerImpl.scala | 2 ++ .../spark/deploy/yarn/ApplicationMaster.scala | 4 +++ .../cluster/YarnClusterSchedulerBackend.scala | 7 ++++- 7 files changed, 54 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3cd0c218a36fd..c2a48af4a758f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -370,6 +370,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli taskScheduler.start() val applicationId: String = taskScheduler.applicationId() + val applicationAttemptId : String = taskScheduler.applicationAttemptId() conf.set("spark.app.id", applicationId) env.blockManager.initialize(applicationId) @@ -386,7 +387,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] val eventLogger: Option[EventLoggingListener] = { if (isEventLogEnabled) { val logger = - new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration) + new EventLoggingListener(applicationId, applicationAttemptId, + eventLogDir.get, conf, hadoopConfiguration) logger.start() listenerBus.addListener(logger) Some(logger) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 30075c172bdb1..e2003cb79d1d6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -47,6 +47,7 @@ import org.apache.spark.util.{JsonProtocol, Utils} */ private[spark] class EventLoggingListener( appId: String, + appAttemptId : String, logBaseDir: String, sparkConf: SparkConf, hadoopConf: Configuration) @@ -54,6 +55,9 @@ private[spark] class EventLoggingListener( import EventLoggingListener._ + def this(appId: String, logBaseDir: String, sparkConf: SparkConf, hadoopConf: Configuration) = + this(appId, "", logBaseDir, sparkConf, hadoopConf) + def this(appId: String, logBaseDir: String, sparkConf: SparkConf) = this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf)) @@ -80,7 +84,7 @@ private[spark] class EventLoggingListener( private[scheduler] val loggedEvents = new ArrayBuffer[JValue] // Visible for tests only. - private[scheduler] val logPath = getLogPath(logBaseDir, appId) + private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId) /** * Creates the log file in the configured log directory. @@ -262,8 +266,26 @@ private[spark] object EventLoggingListener extends Logging { * @return A path which consists of file-system-safe characters. */ def getLogPath(logBaseDir: String, appId: String): String = { + getLogPath(logBaseDir, appId, "") + } + + /** + * Return a file-system-safe path to the log directory for the given application. + * + * @param logBaseDir A base directory for the path to the log directory for given application. + * @param appId A unique app ID. + * @param appAttemptId A unique attempt id of appId. + * @return A path which consists of file-system-safe characters. + */ + + def getLogPath(logBaseDir: String, appId: String, appAttemptId : String): String = { val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase - Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") + + if (appAttemptId.equals("")) { + Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") + } else { + Utils.resolveURI(logBaseDir) + "/" + appAttemptId + "/" + name.stripSuffix("/") + } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 992c477493d8e..6a6ab0c82f310 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -41,4 +41,11 @@ private[spark] trait SchedulerBackend { */ def applicationId(): String = appId + /** + * Get an application ID associated with the job. + * + * @return An application attempt id + */ + def applicationAttemptId(): String = "" + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index ed3418676e077..d612409c81b6f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -78,4 +78,12 @@ private[spark] trait TaskScheduler { * Process a lost executor */ def executorLost(executorId: String, reason: ExecutorLossReason): Unit + + /** + * Get an application's attempt Id associated with the job. + * + * @return An application's Attempt ID + */ + def applicationAttemptId(): String = "" + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 7a9cf1c2e7f30..5542d0c959bae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -514,6 +514,8 @@ private[spark] class TaskSchedulerImpl( } override def applicationId(): String = backend.applicationId() + + override def applicationAttemptId() : String = backend.applicationAttemptId() } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index e966bfba7bb7d..717f58b3e1e02 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -92,6 +92,10 @@ private[spark] class ApplicationMaster( // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up. System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) + + //Propagate the attempt if, so that in case of event logging, different attempt's logs gets created in different directory + System.setProperty("spark.yarn.app.attemptid", appAttemptId.getAttemptId().toString()) + } logInfo("ApplicationAttemptId: " + appAttemptId) diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index b1de81e6a8b0f..7662c02c8c529 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -46,5 +46,10 @@ private[spark] class YarnClusterSchedulerBackend( logError("Application ID is not set.") super.applicationId } - + + override def applicationAttemptId(): String = + sc.getConf.getOption("spark.yarn.app.attemptid").getOrElse { + logError("Application attempt ID is not set.") + super.applicationAttemptId + } } From 64a33d01b50129cc568d52b00971c79e52d28303 Mon Sep 17 00:00:00 2001 From: twinkle sachdeva Date: Wed, 25 Feb 2015 10:15:43 +0530 Subject: [PATCH 2/4] SPARK-4705 Incorporating the review comments regarding formatting, will do the rest of the changes after this --- core/src/main/scala/org/apache/spark/SparkContext.scala | 7 +++---- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../scheduler/cluster/YarnClusterSchedulerBackend.scala | 3 +++ 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c2a48af4a758f..1f5473f6f6602 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -370,7 +370,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli taskScheduler.start() val applicationId: String = taskScheduler.applicationId() - val applicationAttemptId : String = taskScheduler.applicationAttemptId() + val applicationAttemptId: String = taskScheduler.applicationAttemptId() conf.set("spark.app.id", applicationId) env.blockManager.initialize(applicationId) @@ -386,9 +386,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Optionally log Spark events private[spark] val eventLogger: Option[EventLoggingListener] = { if (isEventLogEnabled) { - val logger = - new EventLoggingListener(applicationId, applicationAttemptId, - eventLogDir.get, conf, hadoopConfiguration) + val logger = new EventLoggingListener( + applicationId, applicationAttemptId, eventLogDir.get, conf, hadoopConfiguration) logger.start() listenerBus.addListener(logger) Some(logger) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 5542d0c959bae..7e83cc945be39 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -515,7 +515,7 @@ private[spark] class TaskSchedulerImpl( override def applicationId(): String = backend.applicationId() - override def applicationAttemptId() : String = backend.applicationAttemptId() + override def applicationAttemptId(): String = backend.applicationAttemptId() } diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 7662c02c8c529..f20f4dcb00d64 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -48,6 +48,9 @@ private[spark] class YarnClusterSchedulerBackend( } override def applicationAttemptId(): String = + // In YARN Cluster mode, spark.yarn.app.attemptid is expect to be set + // before user application is launched. + // So, if spark.yarn.app.id is not set, it is something wrong. sc.getConf.getOption("spark.yarn.app.attemptid").getOrElse { logError("Application attempt ID is not set.") super.applicationAttemptId From 0762e86b4398c1c71947ca8d78217441b418757b Mon Sep 17 00:00:00 2001 From: twinkle sachdeva Date: Wed, 25 Feb 2015 14:50:33 +0530 Subject: [PATCH 3/4] SPARK-4705 Incorporating the review comments regarding formatting, will do the rest of the changes after this --- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 717f58b3e1e02..b77943e1b57b3 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -93,7 +93,8 @@ private[spark] class ApplicationMaster( // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up. System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) - //Propagate the attempt if, so that in case of event logging, different attempt's logs gets created in different directory + // Propagate the attempt if, so that in case of event logging, + // different attempt's logs gets created in different directory System.setProperty("spark.yarn.app.attemptid", appAttemptId.getAttemptId().toString()) } From cc9311e8d88bf2c03fe17cca0d2833ba836202ab Mon Sep 17 00:00:00 2001 From: "twinkle.sachdeva" Date: Sun, 1 Mar 2015 20:32:50 +0530 Subject: [PATCH 4/4] SPARK-4705: 1) moved from directory structure to single file, as per the master branch. 2) Added the attempt id inside the SparkListenerApplicationStart, to make the info available independent of directory structure. 3) Changes in History Server to render the UI as per the snaphot II --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../history/ApplicationHistoryProvider.scala | 3 +- .../deploy/history/FsHistoryProvider.scala | 22 ++- .../spark/deploy/history/HistoryPage.scala | 137 ++++++++++++++++-- .../scheduler/ApplicationEventListener.scala | 2 + .../scheduler/EventLoggingListener.scala | 4 +- .../spark/scheduler/SparkListener.scala | 4 +- .../org/apache/spark/util/JsonProtocol.scala | 6 +- 8 files changed, 157 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1f5473f6f6602..e383dc1576817 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1736,7 +1736,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Note: this code assumes that the task scheduler has been initialized and has contacted // the cluster manager to get an application ID (in case the cluster manager provides one). listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), - startTime, sparkUser)) + startTime, sparkUser, applicationAttemptId)) } /** Post the application end event */ diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 553bf3cb945ab..e78eefe5686ea 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -26,7 +26,8 @@ private[spark] case class ApplicationHistoryInfo( endTime: Long, lastUpdated: Long, sparkUser: String, - completed: Boolean = false) + completed: Boolean = false, + appAttemptId: String = "") private[spark] abstract class ApplicationHistoryProvider { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 3e3d6ff29faf0..2889d7a01a98d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -208,10 +208,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis if (!logInfos.isEmpty) { val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() def addIfAbsent(info: FsApplicationHistoryInfo) = { - if (!newApps.contains(info.id) || - newApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) && + val key = + if (info.appAttemptId.equals("")) { + info.id + } else { + info.id + "_" + info.appAttemptId + } + + if (!newApps.contains(key) || + newApps(key).logPath.endsWith(EventLoggingListener.IN_PROGRESS) && !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) { - newApps += (info.id -> info) + newApps += (key -> info) } } @@ -309,7 +316,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis appListener.endTime.getOrElse(-1L), getModificationTime(eventLog).get, appListener.sparkUser.getOrElse(NOT_STARTED), - isApplicationCompleted(eventLog)) + isApplicationCompleted(eventLog), + appListener.appAttemptId.getOrElse("")) } finally { logInput.close() } @@ -410,5 +418,7 @@ private class FsApplicationHistoryInfo( endTime: Long, lastUpdated: Long, sparkUser: String, - completed: Boolean = true) - extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed) + completed: Boolean = true, + appAttemptId: String ="") + extends ApplicationHistoryInfo( + id, name, startTime, endTime, lastUpdated, sparkUser, completed, appAttemptId) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 26ebc75971c66..9c4d9e64bb127 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -22,6 +22,9 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.ui.{WebUIPage, UIUtils} +import scala.collection.immutable.ListMap +import scala.collection.mutable.HashMap +import scala.collection.mutable.ArrayBuffer private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { @@ -34,18 +37,31 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { val requestedIncomplete = Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean - val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete) - val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0 - val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size)) - + val allCompletedAppsNAttempts = + parent.getApplicationList().filter(_.completed != requestedIncomplete) + val (hasAttemptInfo, appToAttemptMap) = getApplicationLevelList(allCompletedAppsNAttempts) + + val allAppsSize = allCompletedAppsNAttempts.size + + val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0 + val apps = + allCompletedAppsNAttempts.slice(actualFirst, Math.min(actualFirst + pageSize, allAppsSize)) + val appWithAttemptsDisplayList = + appToAttemptMap.slice(actualFirst, Math.min(actualFirst + pageSize, allAppsSize)) + val actualPage = (actualFirst / pageSize) + 1 - val last = Math.min(actualFirst + pageSize, allApps.size) - 1 - val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0) + val last = Math.min(actualFirst + pageSize, allAppsSize) - 1 + val pageCount = allAppsSize / pageSize + (if (allAppsSize % pageSize > 0) 1 else 0) val secondPageFromLeft = 2 val secondPageFromRight = pageCount - 1 - val appTable = UIUtils.listingTable(appHeader, appRow, apps) + val appTable = + if (hasAttemptInfo) { + UIUtils.listingTable(appWithAttemptHeader, appWithAttemptRow, appWithAttemptsDisplayList) + } else { + UIUtils.listingTable(appHeader, appRow, apps) + } val providerConfig = parent.getProviderConfig() val content =
@@ -59,7 +75,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { // to the first and last page. If the current page +/- `plusOrMinus` is greater // than the 2nd page from the first page or less than the 2nd page from the last // page, `...` will be displayed. - if (allApps.size > 0) { + if (allAppsSize > 0) { val leftSideIndices = rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _, requestedIncomplete) val rightSideIndices = @@ -67,7 +83,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { requestedIncomplete)

- Showing {actualFirst + 1}-{last + 1} of {allApps.size} + Showing {actualFirst + 1}-{last + 1} of {allAppsSize} {if (requestedIncomplete) "(Incomplete applications)"} { @@ -113,6 +129,36 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {

UIUtils.basicSparkPage(content, "History Server") } + + private def getApplicationLevelList (appNattemptList: Iterable[ApplicationHistoryInfo]) ={ + // Create HashMap as per the multiple attempts for one application. + // If there is no attempt specific stuff, then + // do return false, to indicate the same, so that previous UI gets displayed. + var hasAttemptInfo = false + val appToAttemptInfo = new HashMap[String, ArrayBuffer[ApplicationHistoryInfo]] + for( appAttempt <- appNattemptList) { + if(!appAttempt.appAttemptId.equals("")){ + hasAttemptInfo = true + val attemptId = appAttempt.appAttemptId.toInt + if(appToAttemptInfo.contains(appAttempt.id)){ + val currentAttempts = appToAttemptInfo.get(appAttempt.id).get + currentAttempts += appAttempt + appToAttemptInfo.put( appAttempt.id, currentAttempts) + } else { + val currentAttempts = new ArrayBuffer[ApplicationHistoryInfo]() + currentAttempts += appAttempt + appToAttemptInfo.put( appAttempt.id, currentAttempts ) + } + }else { + val currentAttempts = new ArrayBuffer[ApplicationHistoryInfo]() + currentAttempts += appAttempt + appToAttemptInfo.put(appAttempt.id, currentAttempts) + } + } + val sortedMap = ListMap(appToAttemptInfo.toSeq.sortWith(_._1 > _._1):_*) + (hasAttemptInfo, sortedMap) + } + private val appHeader = Seq( "App ID", @@ -128,6 +174,16 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { range.filter(condition).map(nextPage => {nextPage} ) } + + private val appWithAttemptHeader = Seq( + "App ID", + "App Name", + "Attempt ID", + "Started", + "Completed", + "Duration", + "Spark User", + "Last Updated") private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" @@ -146,6 +202,69 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { {lastUpdated} } + + private def getAttemptURI(attemptInfo: ApplicationHistoryInfo, + returnEmptyIfAttemptInfoNull: Boolean = true ) = { + if (attemptInfo.appAttemptId.equals("")) { + if(returnEmptyIfAttemptInfoNull) { + attemptInfo.appAttemptId + } else { + HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}" + } + } else { + HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}" + "_" + s"${attemptInfo.appAttemptId}" + } + } + + private def firstAttemptRow(attemptInfo : ApplicationHistoryInfo) = { + val uiAddress = + if (attemptInfo.appAttemptId.equals("")) { + attemptInfo.appAttemptId + } else { + HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}" + "_" + s"${attemptInfo.appAttemptId}" + } + + val startTime = UIUtils.formatDate(attemptInfo.startTime) + val endTime = UIUtils.formatDate(attemptInfo.endTime) + val duration = UIUtils.formatDuration(attemptInfo.endTime - attemptInfo.startTime) + val lastUpdated = UIUtils.formatDate(attemptInfo.lastUpdated) + val attemptId = attemptInfo.appAttemptId + {attemptId} + {startTime} + {endTime} + + {duration} + {attemptInfo.sparkUser} + {lastUpdated} + } + + private def attemptRow(attemptInfo: ApplicationHistoryInfo) = { + + {firstAttemptRow(attemptInfo)} + + } + + private def appWithAttemptRow( + appAttemptsInfo: (String,ArrayBuffer[ApplicationHistoryInfo])): Seq[Node] = { + val applicationId = appAttemptsInfo._1 + val info = appAttemptsInfo._2 + val rowSpan = info.length + val rowSpanString = rowSpan.toString + val applicatioName = info(0).name + val lastAttemptURI = getAttemptURI(info(0), false) + val ttAttempts = info.slice(1, rowSpan -1) + val x = new xml.NodeBuffer + x += + + {applicationId} + {applicatioName} + { firstAttemptRow(info(0)) } + ; + for( i <- 1 until rowSpan ){ + x += attemptRow(info(i)) + } + x + } private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = { "/?" + Array( diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala index 6d39a5e3fa64c..a591c7e046d5d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala @@ -26,6 +26,7 @@ package org.apache.spark.scheduler private[spark] class ApplicationEventListener extends SparkListener { var appName: Option[String] = None var appId: Option[String] = None + var appAttemptId: Option[String] = None var sparkUser: Option[String] = None var startTime: Option[Long] = None var endTime: Option[Long] = None @@ -35,6 +36,7 @@ private[spark] class ApplicationEventListener extends SparkListener { override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { appName = Some(applicationStart.appName) appId = applicationStart.appId + appAttemptId = Some(applicationStart.appAttemptId) startTime = Some(applicationStart.time) sparkUser = Some(applicationStart.sparkUser) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index e2003cb79d1d6..a6782c1406c82 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -282,9 +282,9 @@ private[spark] object EventLoggingListener extends Logging { val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase if (appAttemptId.equals("")) { - Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") + Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") } else { - Utils.resolveURI(logBaseDir) + "/" + appAttemptId + "/" + name.stripSuffix("/") + Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") + "_" + appAttemptId } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index dd28ddb31de1f..e2ed4fe7b4c96 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -110,8 +110,8 @@ case class SparkListenerExecutorMetricsUpdate( extends SparkListenerEvent @DeveloperApi -case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long, - sparkUser: String) extends SparkListenerEvent +case class SparkListenerApplicationStart(appName: String, appId: Option[String], + time: Long, sparkUser: String, appAttemptId: String = "") extends SparkListenerEvent @DeveloperApi case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 8e20864db5673..71a53a844ddc8 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -192,7 +192,8 @@ private[spark] object JsonProtocol { ("App Name" -> applicationStart.appName) ~ ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~ ("Timestamp" -> applicationStart.time) ~ - ("User" -> applicationStart.sparkUser) + ("User" -> applicationStart.sparkUser) ~ + ("appAttemptId" -> applicationStart.appAttemptId) } def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = { @@ -553,7 +554,8 @@ private[spark] object JsonProtocol { val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String]) val time = (json \ "Timestamp").extract[Long] val sparkUser = (json \ "User").extract[String] - SparkListenerApplicationStart(appName, appId, time, sparkUser) + val appAttemptId = (json \ "appAttemptId").extract[String] + SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId) } def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {