From 353a42363bd0424f38d296e75f63988b8186d422 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 10 Sep 2014 18:46:15 +0900 Subject: [PATCH 01/19] Implemented renderJson for StagePage --- .../org/apache/spark/ui/jobs/StagePage.scala | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index db01be596e073..7a20349326719 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -20,17 +20,51 @@ package org.apache.spark.ui.jobs import java.util.Date import javax.servlet.http.HttpServletRequest +import org.json4s.JsonAST.JNothing + import scala.xml.{Node, Unparsed} +import org.json4s.{JObject, JValue} +import org.json4s.JsonDSL._ + import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils} import org.apache.spark.ui.jobs.UIData._ -import org.apache.spark.util.{Utils, Distribution} +import org.apache.spark.util.{JsonProtocol, Utils, Distribution} import org.apache.spark.scheduler.AccumulableInfo /** Page showing statistics and task list for a given stage */ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { private val listener = parent.listener + override def renderJson(request: HttpServletRequest): JValue = { + val stageId = request.getParameter("id").toInt + val stageAttemptId = request.getParameter("attempt").toInt + + val stageDataOpt = listener.stageIdToData.get((stageId, stageAttemptId)) + var retVal: JValue = JNothing + + if (!stageDataOpt.isEmpty && !stageDataOpt.get.taskData.isEmpty) { + val stageData = stageDataOpt.get + val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime) + + val taskList = tasks.map { + case uiData: TaskUIData => + var jsonTaskInfo: JValue = JsonProtocol.taskInfoToJson(uiData.taskInfo) + val jsonTaskMetrics: JValue = + if (uiData.taskMetrics.isDefined) { + JsonProtocol.taskMetricsToJson(uiData.taskMetrics.get) + } else JNothing + + if (jsonTaskInfo.isInstanceOf[JObject] && jsonTaskMetrics.isInstanceOf[JObject]) { + jsonTaskInfo = jsonTaskInfo.asInstanceOf[JObject] ~ jsonTaskMetrics.asInstanceOf[JObject] + } + jsonTaskInfo + } + retVal = ("Task List" -> taskList) + } + retVal + } + def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val stageId = request.getParameter("id").toInt From b8578a73082ea0c7e5b375da8aca34ff1ea21d54 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 10 Sep 2014 18:46:34 +0900 Subject: [PATCH 02/19] Implemented renderJson for EnvironmentPage --- .../apache/spark/ui/env/EnvironmentPage.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala index f0a1174a71d34..a9b1bcb9fec9d 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala @@ -19,6 +19,9 @@ package org.apache.spark.ui.env import javax.servlet.http.HttpServletRequest +import org.json4s.{JObject, JValue} +import org.json4s.JsonDSL._ + import scala.xml.Node import org.apache.spark.ui.{UIUtils, WebUIPage} @@ -26,6 +29,19 @@ import org.apache.spark.ui.{UIUtils, WebUIPage} private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") { private val listener = parent.listener + override def renderJson(request: HttpServletRequest): JValue = { + val jvmInfoJson = ("RUntime Informationf" -> listener.jvmInformation.foldLeft(JObject())(_ ~ _)) + val sparkPropertiesJson = ("Spark Properties" -> listener.sparkProperties.foldLeft(JObject())(_ ~ _)) + val systemPropertiesJson = ("System Properties" -> listener.systemProperties.foldLeft(JObject())(_ ~ _)) + val classPathEntriesJson = ("Classpath Entries" -> listener.classpathEntries.foldLeft(JObject())(_ ~ _)) + + val environment = ("Environment" -> jvmInfoJson ~ + sparkPropertiesJson ~ + systemPropertiesJson ~ + classPathEntriesJson) + environment + } + def render(request: HttpServletRequest): Seq[Node] = { val runtimeInformationTable = UIUtils.listingTable( propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true) From 937c8b7b80964e76b4339b44835ca00d3e0fa5ec Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 10 Sep 2014 19:07:32 +0900 Subject: [PATCH 03/19] tmp --- .../apache/spark/ui/exec/ExecutorsPage.scala | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index b0e3bb3b552fd..c96abf42dac2b 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -19,10 +19,14 @@ package org.apache.spark.ui.exec import javax.servlet.http.HttpServletRequest +import org.apache.spark.storage.StorageStatus + import scala.xml.Node +import org.json4s.JValue + import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} -import org.apache.spark.util.Utils +import org.apache.spark.util.{JsonProtocol, Utils} /** Summary information about an executor to display in the UI. */ private case class ExecutorSummaryInfo( @@ -44,6 +48,18 @@ private case class ExecutorSummaryInfo( private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { private val listener = parent.listener + override def renderJson(request: HttpServletRequest): JValue = { + val storageStatusList = listener.storageStatusList + + val execInfoJsonList = for (statusId <- 0 until storageStatusList.size) yield { + val execInfo = getExecInfo(statusId) + + } + + ("Executor List" -> ) + + } + def render(request: HttpServletRequest): Seq[Node] = { val storageStatusList = listener.storageStatusList val maxMem = storageStatusList.map(_.maxMem).sum From 55568564601f700d1fd91d82c0cafdd46bb64496 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 11 Sep 2014 14:56:52 +0900 Subject: [PATCH 04/19] Implemented renderJson for ExecutorPage --- .../apache/spark/ui/exec/ExecutorsPage.scala | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index c96abf42dac2b..ec06e92e30c62 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -19,14 +19,13 @@ package org.apache.spark.ui.exec import javax.servlet.http.HttpServletRequest -import org.apache.spark.storage.StorageStatus - import scala.xml.Node import org.json4s.JValue +import org.json4s.JsonDSL._ import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} -import org.apache.spark.util.{JsonProtocol, Utils} +import org.apache.spark.util.Utils /** Summary information about an executor to display in the UI. */ private case class ExecutorSummaryInfo( @@ -53,10 +52,22 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { val execInfoJsonList = for (statusId <- 0 until storageStatusList.size) yield { val execInfo = getExecInfo(statusId) - + ("Executor ID" -> execInfo.id) ~ + ("Address" -> execInfo.hostPort) ~ + ("RDD Blocks" -> execInfo.rddBlocks) ~ + ("Memory Used" -> execInfo.memoryUsed) ~ + ("Disk Used" -> execInfo.diskUsed) ~ + ("Active Tasks" -> execInfo.activeTasks) ~ + ("Failed Tasks" -> execInfo.failedTasks) ~ + ("Complete Tasks" -> execInfo.completedTasks) ~ + ("TotalTasks" -> execInfo.totalTasks) ~ + ("Task Time" -> execInfo.totalDuration) ~ + ("Input" -> execInfo.totalInputBytes) ~ + ("Shuffle Read" -> execInfo.totalShuffleRead) ~ + ("Shuffle Write" -> execInfo.totalShuffleWrite) } - ("Executor List" -> ) + ("Executor List" -> execInfoJsonList) } From f7958b04508e02d0f58895a3a8cc3e0b6fef33ab Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 11 Sep 2014 15:46:55 +0900 Subject: [PATCH 05/19] Implemented renderJson for JobProgressPage --- .../spark/ui/jobs/JobProgressPage.scala | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala index a82f71ed08475..e23abe4e6b3ea 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala @@ -19,9 +19,14 @@ package org.apache.spark.ui.jobs import javax.servlet.http.HttpServletRequest +import org.apache.spark.util.JsonProtocol +import org.json4s.JValue +import org.json4s.JsonAST.JNothing +import org.json4s.JsonDSL._ + import scala.xml.{Node, NodeSeq} -import org.apache.spark.scheduler.Schedulable +import org.apache.spark.scheduler.{StageInfo, Schedulable} import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing list of all ongoing and recently finished stages and pools */ @@ -31,6 +36,34 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") private val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler + override def renderJson(request: HttpServletRequest): JValue = { + listener.synchronized { + val activeStageList = listener.activeStages.values.map { + case info: StageInfo => + JsonProtocol.stageInfoToJson(info) + } + val activeStageJson = ("Active Stages" -> activeStageList) + + val completedStageList = listener.completedStages.reverse.map { + case info: StageInfo => + JsonProtocol.stageInfoToJson(info) + } + val completedStageJson = ("Completed Stages" -> completedStageList) + + val failedStageList = listener.failedStages.reverse.map { + case info: StageInfo => + JsonProtocol.stageInfoToJson(info) + } + val failedStageJson = ("Failed Stages" -> failedStageList) + + ("Stages" -> + activeStageJson ~ + completedStageJson ~ + failedStageJson) + + } + } + def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val activeStages = listener.activeStages.values.toSeq From 9e0010a16153538712f8009901a18b474640f967 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 11 Sep 2014 16:17:49 +0900 Subject: [PATCH 06/19] Implemented renderJson for PoolPage --- .../org/apache/spark/ui/jobs/PoolPage.scala | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 7a6c7d1a497ed..9e7e39fa28c39 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -19,8 +19,13 @@ package org.apache.spark.ui.jobs import javax.servlet.http.HttpServletRequest +import org.apache.spark.util.JsonProtocol + import scala.xml.Node +import org.json4s.JValue +import org.json4s.JsonDSL._ + import org.apache.spark.scheduler.{Schedulable, StageInfo} import org.apache.spark.ui.{WebUIPage, UIUtils} @@ -30,6 +35,35 @@ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") { private val sc = parent.sc private val listener = parent.listener + override def renderJson(request: HttpServletRequest): JValue = { + listener.synchronized { + val poolName = request.getParameter("poolname") + val poolToActiveStages = listener.poolToActiveStages + val activeStages = poolToActiveStages.get(poolName) match { + case Some(s) => s.values.map { + case info: StageInfo => + JsonProtocol.stageInfoToJson(info) + } + case None => Seq[JValue]() + } + + val pools = if (live) Seq(sc.getPoolForName(poolName).get) else Seq[Schedulable]() + + val poolList = pools.map { + case schedulable: Schedulable => + ("Pool Name" -> schedulable.name) ~ + ("Minimum Share" -> schedulable.minShare) ~ + ("Pool Weight" -> schedulable.weight) ~ + ("Active Stages" -> activeStages) ~ + ("Running Tasks" -> schedulable.runningTasks) ~ + ("Scheduling Mode" -> schedulable.schedulingMode.toString) + } + + ("Pools" -> poolList) + + } + } + def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val poolName = request.getParameter("poolname") From 88507061c3c491af47831162a14763d685df3b83 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 11 Sep 2014 16:21:12 +0900 Subject: [PATCH 07/19] Modified style --- .../org/apache/spark/ui/env/EnvironmentPage.scala | 12 ++++++++---- .../scala/org/apache/spark/ui/jobs/StagePage.scala | 3 ++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala index a9b1bcb9fec9d..ba3c61d0094aa 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala @@ -30,10 +30,14 @@ private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") private val listener = parent.listener override def renderJson(request: HttpServletRequest): JValue = { - val jvmInfoJson = ("RUntime Informationf" -> listener.jvmInformation.foldLeft(JObject())(_ ~ _)) - val sparkPropertiesJson = ("Spark Properties" -> listener.sparkProperties.foldLeft(JObject())(_ ~ _)) - val systemPropertiesJson = ("System Properties" -> listener.systemProperties.foldLeft(JObject())(_ ~ _)) - val classPathEntriesJson = ("Classpath Entries" -> listener.classpathEntries.foldLeft(JObject())(_ ~ _)) + val jvmInfoJson = + ("RUntime Informationf" -> listener.jvmInformation.foldLeft(JObject())(_ ~ _)) + val sparkPropertiesJson = + ("Spark Properties" -> listener.sparkProperties.foldLeft(JObject())(_ ~ _)) + val systemPropertiesJson = + ("System Properties" -> listener.systemProperties.foldLeft(JObject())(_ ~ _)) + val classPathEntriesJson = + ("Classpath Entries" -> listener.classpathEntries.foldLeft(JObject())(_ ~ _)) val environment = ("Environment" -> jvmInfoJson ~ sparkPropertiesJson ~ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 7a20349326719..155f1db973615 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -56,7 +56,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { } else JNothing if (jsonTaskInfo.isInstanceOf[JObject] && jsonTaskMetrics.isInstanceOf[JObject]) { - jsonTaskInfo = jsonTaskInfo.asInstanceOf[JObject] ~ jsonTaskMetrics.asInstanceOf[JObject] + jsonTaskInfo = + jsonTaskInfo.asInstanceOf[JObject] ~ jsonTaskMetrics.asInstanceOf[JObject] } jsonTaskInfo } From e537be67cdbb8d02cecf57aad47e4b2fb77f98d0 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 11 Sep 2014 16:30:09 +0900 Subject: [PATCH 08/19] Implemented renderJson for StoragePage --- .../org/apache/spark/ui/storage/StoragePage.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala index 716591c9ed449..9f14141535197 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala @@ -21,14 +21,26 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node +import org.json4s.JValue +import org.json4s.JsonDSL._ + import org.apache.spark.storage.RDDInfo import org.apache.spark.ui.{WebUIPage, UIUtils} -import org.apache.spark.util.Utils +import org.apache.spark.util.{JsonProtocol, Utils} /** Page showing list of RDD's currently stored in the cluster */ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { private val listener = parent.listener + override def renderJson(request: HttpServletRequest): JValue = { + val rddList = listener.rddInfoList.map { + case info: RDDInfo => + JsonProtocol.rddInfoToJson(info) + } + + ("RDDs" -> rddList) + } + def render(request: HttpServletRequest): Seq[Node] = { val rdds = listener.rddInfoList val content = UIUtils.listingTable(rddHeader, rddRow, rdds) From c1082008ee54aef85a023cb2551c8ca12e33feea Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 11 Sep 2014 18:20:37 +0900 Subject: [PATCH 09/19] Implemented renderJson for RDDPage --- .../org/apache/spark/ui/storage/RDDPage.scala | 63 ++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 8a0075ae8daf7..f4248c069d45f 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -19,9 +19,14 @@ package org.apache.spark.ui.storage import javax.servlet.http.HttpServletRequest +import org.json4s.JsonAST.JNothing + import scala.xml.Node -import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils} +import org.json4s.JValue +import org.json4s.JsonDSL._ + +import org.apache.spark.storage._ import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils @@ -29,6 +34,62 @@ import org.apache.spark.util.Utils private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { private val listener = parent.listener + override def renderJson(request: HttpServletRequest): JValue = { + val rddId = request.getParameter("id").toInt + val storageStatusList = listener.storageStatusList + val rddInfoOpt = listener.rddInfoList.find(_.id == rddId) + + var retVal: JValue = JNothing + + if (rddInfoOpt.isDefined) { + val rddInfo = rddInfoOpt.get + + val rddSummaryJson = ("RDD Summary" -> + ("RDD ID" -> rddId) ~ + ("Storage Level" -> rddInfo.storageLevel.description) ~ + ("Cached Partitions" -> rddInfo.numCachedPartitions) ~ + ("Total Partitions" -> rddInfo.numPartitions) ~ + ("Memory Size" -> rddInfo.memSize) ~ + ("Disk Size" -> rddInfo.diskSize)) + + val dataDistributionList = storageStatusList.map { + case status: StorageStatus => + ("Host" -> (status.blockManagerId.host + ":" + status.blockManagerId.port)) ~ + ("Memory Usage" -> status.memUsedByRdd(rddId)) ~ + ("Memory Remaining" -> status.memRemaining) ~ + ("Disk Usage" -> status.diskUsedByRdd(rddId)) + } + + val dataDistributionJson = ("Data Distribution" -> dataDistributionList) + + val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList) + val blocks = storageStatusList + .flatMap(_.rddBlocksById(rddId)) + .sortWith(_._1.name < _._1.name) + .map { case (blockId, status) => + (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown"))) + } + val partitionList = blocks.map { + case (id: BlockId, block: BlockStatus, locations: Seq[String]) => + ("Block Name" -> id.toString) ~ + ("Storage Level" -> block.storageLevel.description) ~ + ("Size in Memory" -> block.memSize) ~ + ("Size on Disk" -> block.diskSize) ~ + ("Executors" -> locations) + } + val partitionsJson = ("Partitions" -> partitionList) + + + retVal = + ("RDD Info" -> + rddSummaryJson ~ + dataDistributionJson ~ + partitionsJson + ) + } + retVal + } + def render(request: HttpServletRequest): Seq[Node] = { val rddId = request.getParameter("id").toInt val storageStatusList = listener.storageStatusList From 2f8f9f3ba2296ce976ce2eaa0b24a44e720379f9 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 11 Sep 2014 18:37:00 +0900 Subject: [PATCH 10/19] Added "Scheduling Mode" field in the json data returned by JobProgressPage#renderJson --- .../scala/org/apache/spark/ui/jobs/JobProgressPage.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala index e23abe4e6b3ea..0866fd5bda984 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala @@ -38,6 +38,7 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") override def renderJson(request: HttpServletRequest): JValue = { listener.synchronized { + val activeStageList = listener.activeStages.values.map { case info: StageInfo => JsonProtocol.stageInfoToJson(info) @@ -56,11 +57,11 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") } val failedStageJson = ("Failed Stages" -> failedStageList) - ("Stages" -> + ("Stages Info" -> + ("Scheduling Mode" -> listener.schedulingMode.map(_.toString).getOrElse("Unknown")) ~ activeStageJson ~ completedStageJson ~ failedStageJson) - } } From a349d0e26765174fd9147f6fced4693d5dc48c98 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 11 Sep 2014 19:03:46 +0900 Subject: [PATCH 11/19] Added "Stage Summary" field to the json data returned from StagePage#renderJson --- .../org/apache/spark/ui/jobs/StagePage.scala | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 155f1db973615..de29e51afae3d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -40,11 +40,31 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val stageId = request.getParameter("id").toInt val stageAttemptId = request.getParameter("attempt").toInt + var stageSummary = ("Stage ID" -> stageId) ~ ("Stage Attempt ID" -> stageAttemptId) + + val stageDataOpt = listener.stageIdToData.get((stageId, stageAttemptId)) var retVal: JValue = JNothing if (!stageDataOpt.isEmpty && !stageDataOpt.get.taskData.isEmpty) { val stageData = stageDataOpt.get + + stageSummary ~= ("Executor Run Time" -> stageData.executorRunTime) + if (stageData.inputBytes > 0) stageSummary ~= ("Input Bytes" -> stageData.inputBytes) + if (stageData.shuffleReadBytes > 0) { + stageSummary ~= ("Shuffle Read Bytes" -> stageData.shuffleReadBytes) + } + + if (stageData.shuffleWriteBytes > 0) { + stageSummary ~= ("Shuffle Write bytes" -> stageData.shuffleWriteBytes) + } + + if (stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0) { + stageSummary ~= + ("Memory Bytes Spilled" -> stageData.memoryBytesSpilled) ~ + ("Disk Bytes Spilled" -> stageData.diskBytesSpilled) + } + val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime) val taskList = tasks.map { @@ -61,7 +81,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { } jsonTaskInfo } - retVal = ("Task List" -> taskList) + + retVal = + ("Stage Info" -> + ("StageSummary" -> stageSummary) ~ + ("Tasks" -> taskList)) } retVal } From eb49ea545d9fbe9368da6f3d2c2b96fe5d688714 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 12 Sep 2014 10:24:01 +0900 Subject: [PATCH 12/19] Modified variable name to be returned in EnvironmentPage.scala Modified typos in EnvironmentPage.scala Removed Top level json field name in EnvironmentPage.scala --- .../org/apache/spark/ui/env/EnvironmentPage.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala index ba3c61d0094aa..d0cc8618dc627 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala @@ -31,7 +31,7 @@ private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") override def renderJson(request: HttpServletRequest): JValue = { val jvmInfoJson = - ("RUntime Informationf" -> listener.jvmInformation.foldLeft(JObject())(_ ~ _)) + ("Runtime Information" -> listener.jvmInformation.foldLeft(JObject())(_ ~ _)) val sparkPropertiesJson = ("Spark Properties" -> listener.sparkProperties.foldLeft(JObject())(_ ~ _)) val systemPropertiesJson = @@ -39,11 +39,13 @@ private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") val classPathEntriesJson = ("Classpath Entries" -> listener.classpathEntries.foldLeft(JObject())(_ ~ _)) - val environment = ("Environment" -> jvmInfoJson ~ - sparkPropertiesJson ~ - systemPropertiesJson ~ - classPathEntriesJson) - environment + val environmentJson = + jvmInfoJson ~ + sparkPropertiesJson ~ + systemPropertiesJson ~ + classPathEntriesJson + + environmentJson } def render(request: HttpServletRequest): Seq[Node] = { From d4d8c2253411c10200353fb739625ed70a6e357f Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 12 Sep 2014 11:30:46 +0900 Subject: [PATCH 13/19] Added spaces at the place where creating json in ExecutorPage.scala --- .../apache/spark/ui/exec/ExecutorsPage.scala | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index ec06e92e30c62..1779c86014554 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -49,26 +49,24 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { override def renderJson(request: HttpServletRequest): JValue = { val storageStatusList = listener.storageStatusList - val execInfoJsonList = for (statusId <- 0 until storageStatusList.size) yield { val execInfo = getExecInfo(statusId) - ("Executor ID" -> execInfo.id) ~ - ("Address" -> execInfo.hostPort) ~ - ("RDD Blocks" -> execInfo.rddBlocks) ~ - ("Memory Used" -> execInfo.memoryUsed) ~ - ("Disk Used" -> execInfo.diskUsed) ~ - ("Active Tasks" -> execInfo.activeTasks) ~ - ("Failed Tasks" -> execInfo.failedTasks) ~ - ("Complete Tasks" -> execInfo.completedTasks) ~ - ("TotalTasks" -> execInfo.totalTasks) ~ - ("Task Time" -> execInfo.totalDuration) ~ - ("Input" -> execInfo.totalInputBytes) ~ - ("Shuffle Read" -> execInfo.totalShuffleRead) ~ - ("Shuffle Write" -> execInfo.totalShuffleWrite) + ("Executor ID" -> execInfo.id) ~ + ("Address" -> execInfo.hostPort) ~ + ("RDD Blocks" -> execInfo.rddBlocks) ~ + ("Memory Used" -> execInfo.memoryUsed) ~ + ("Disk Used" -> execInfo.diskUsed) ~ + ("Active Tasks" -> execInfo.activeTasks) ~ + ("Failed Tasks" -> execInfo.failedTasks) ~ + ("Complete Tasks" -> execInfo.completedTasks) ~ + ("TotalTasks" -> execInfo.totalTasks) ~ + ("Task Time" -> execInfo.totalDuration) ~ + ("Input" -> execInfo.totalInputBytes) ~ + ("Shuffle Read" -> execInfo.totalShuffleRead) ~ + ("Shuffle Write" -> execInfo.totalShuffleWrite) } - ("Executor List" -> execInfoJsonList) - + execInfoJsonList } def render(request: HttpServletRequest): Seq[Node] = { From 36ce0ed17fbf2eb9844786e10c2434b8dd7b8b85 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 12 Sep 2014 11:38:15 +0900 Subject: [PATCH 14/19] Simplified json creating logic in JobProgressPage.scala --- .../spark/ui/jobs/JobProgressPage.scala | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala index 0866fd5bda984..7b5f5c3a58d06 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala @@ -39,29 +39,23 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") override def renderJson(request: HttpServletRequest): JValue = { listener.synchronized { - val activeStageList = listener.activeStages.values.map { - case info: StageInfo => - JsonProtocol.stageInfoToJson(info) - } + val activeStageList = + listener.activeStages.values.map { info => JsonProtocol.stageInfoToJson(info) } val activeStageJson = ("Active Stages" -> activeStageList) - - val completedStageList = listener.completedStages.reverse.map { - case info: StageInfo => - JsonProtocol.stageInfoToJson(info) - } + val completedStageList = + listener.completedStages.reverse.map { info => JsonProtocol.stageInfoToJson(info) } val completedStageJson = ("Completed Stages" -> completedStageList) - - val failedStageList = listener.failedStages.reverse.map { - case info: StageInfo => - JsonProtocol.stageInfoToJson(info) - } + val failedStageList = + listener.failedStages.reverse.map { info => JsonProtocol.stageInfoToJson(info) } val failedStageJson = ("Failed Stages" -> failedStageList) - ("Stages Info" -> + val stageInfoJson = ("Scheduling Mode" -> listener.schedulingMode.map(_.toString).getOrElse("Unknown")) ~ activeStageJson ~ completedStageJson ~ - failedStageJson) + failedStageJson + + stageInfoJson } } From 1882f384a9e6299d500be1c30dd2c1ae410a57bd Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 12 Sep 2014 11:43:30 +0900 Subject: [PATCH 15/19] Simplified PoolPage.scala --- .../scala/org/apache/spark/ui/jobs/JobProgressPage.scala | 4 ++-- .../main/scala/org/apache/spark/ui/jobs/PoolPage.scala | 9 ++++----- .../main/scala/org/apache/spark/ui/jobs/StagePage.scala | 4 +--- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala index 7b5f5c3a58d06..8ce8e80772873 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala @@ -20,13 +20,13 @@ package org.apache.spark.ui.jobs import javax.servlet.http.HttpServletRequest import org.apache.spark.util.JsonProtocol + import org.json4s.JValue -import org.json4s.JsonAST.JNothing import org.json4s.JsonDSL._ import scala.xml.{Node, NodeSeq} -import org.apache.spark.scheduler.{StageInfo, Schedulable} +import org.apache.spark.scheduler.Schedulable import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing list of all ongoing and recently finished stages and pools */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 9e7e39fa28c39..58eb4df110dce 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -49,18 +49,17 @@ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") { val pools = if (live) Seq(sc.getPoolForName(poolName).get) else Seq[Schedulable]() - val poolList = pools.map { - case schedulable: Schedulable => + val poolListJson = + pools.map { schedulable => ("Pool Name" -> schedulable.name) ~ ("Minimum Share" -> schedulable.minShare) ~ ("Pool Weight" -> schedulable.weight) ~ ("Active Stages" -> activeStages) ~ ("Running Tasks" -> schedulable.runningTasks) ~ ("Scheduling Mode" -> schedulable.schedulingMode.toString) - } - - ("Pools" -> poolList) + } + poolListJson } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index de29e51afae3d..38ede6706aefc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -20,11 +20,9 @@ package org.apache.spark.ui.jobs import java.util.Date import javax.servlet.http.HttpServletRequest -import org.json4s.JsonAST.JNothing - import scala.xml.{Node, Unparsed} -import org.json4s.{JObject, JValue} +import org.json4s.{JNothing, JObject, JValue} import org.json4s.JsonDSL._ import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils} From 6b159ed3bcf7ef0c095c0c4016d9e4c8037ef543 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 12 Sep 2014 11:46:49 +0900 Subject: [PATCH 16/19] Simplified json creating logic in StagePage.scala --- .../scala/org/apache/spark/ui/jobs/StagePage.scala | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 38ede6706aefc..59671cf47394f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -37,12 +37,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { override def renderJson(request: HttpServletRequest): JValue = { val stageId = request.getParameter("id").toInt val stageAttemptId = request.getParameter("attempt").toInt - var stageSummary = ("Stage ID" -> stageId) ~ ("Stage Attempt ID" -> stageAttemptId) - - val stageDataOpt = listener.stageIdToData.get((stageId, stageAttemptId)) - var retVal: JValue = JNothing + var stageInfoJson: JValue = JNothing if (!stageDataOpt.isEmpty && !stageDataOpt.get.taskData.isEmpty) { val stageData = stageDataOpt.get @@ -80,10 +77,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { jsonTaskInfo } - retVal = - ("Stage Info" -> - ("StageSummary" -> stageSummary) ~ - ("Tasks" -> taskList)) + stageInfoJson = ("Stage Summary" -> stageSummary) ~ ("Tasks" -> taskList) } retVal } From 270346ac49c68a0e502e18a2256a9b3a8108866a Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 12 Sep 2014 11:56:49 +0900 Subject: [PATCH 17/19] Simplified json creating logic in RDDPage.scala --- .../org/apache/spark/ui/storage/RDDPage.scala | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index f4248c069d45f..4ba7a29b4b484 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -19,11 +19,9 @@ package org.apache.spark.ui.storage import javax.servlet.http.HttpServletRequest -import org.json4s.JsonAST.JNothing - import scala.xml.Node -import org.json4s.JValue +import org.json4s.{JNothing, JValue} import org.json4s.JsonDSL._ import org.apache.spark.storage._ @@ -39,7 +37,7 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { val storageStatusList = listener.storageStatusList val rddInfoOpt = listener.rddInfoList.find(_.id == rddId) - var retVal: JValue = JNothing + var rddInfoJson: JValue = JNothing if (rddInfoOpt.isDefined) { val rddInfo = rddInfoOpt.get @@ -52,13 +50,13 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { ("Memory Size" -> rddInfo.memSize) ~ ("Disk Size" -> rddInfo.diskSize)) - val dataDistributionList = storageStatusList.map { - case status: StorageStatus => + val dataDistributionList = + storageStatusList.map { status => ("Host" -> (status.blockManagerId.host + ":" + status.blockManagerId.port)) ~ ("Memory Usage" -> status.memUsedByRdd(rddId)) ~ ("Memory Remaining" -> status.memRemaining) ~ ("Disk Usage" -> status.diskUsedByRdd(rddId)) - } + } val dataDistributionJson = ("Data Distribution" -> dataDistributionList) @@ -68,26 +66,25 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { .sortWith(_._1.name < _._1.name) .map { case (blockId, status) => (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown"))) - } - val partitionList = blocks.map { - case (id: BlockId, block: BlockStatus, locations: Seq[String]) => + } + + val partitionList = + blocks.map { case (id, block, locations) => ("Block Name" -> id.toString) ~ ("Storage Level" -> block.storageLevel.description) ~ ("Size in Memory" -> block.memSize) ~ ("Size on Disk" -> block.diskSize) ~ ("Executors" -> locations) - } - val partitionsJson = ("Partitions" -> partitionList) + } + val partitionsJson = ("Partitions" -> partitionList) - retVal = - ("RDD Info" -> - rddSummaryJson ~ - dataDistributionJson ~ - partitionsJson - ) + rddInfoJson = + rddSummaryJson ~ + dataDistributionJson ~ + partitionsJson } - retVal + rddInfoJson } def render(request: HttpServletRequest): Seq[Node] = { From f1b6bcfb6b04f96431c98a4e9f7a04ab600718f9 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 12 Sep 2014 11:57:52 +0900 Subject: [PATCH 18/19] Simplified json creating logic in StoragePage.scala --- .../scala/org/apache/spark/ui/storage/StoragePage.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala index 9f14141535197..dcf851867bd7e 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala @@ -33,12 +33,10 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { private val listener = parent.listener override def renderJson(request: HttpServletRequest): JValue = { - val rddList = listener.rddInfoList.map { - case info: RDDInfo => - JsonProtocol.rddInfoToJson(info) - } + val rddJsonList = + listener.rddInfoList.map { info => JsonProtocol.rddInfoToJson(info) } - ("RDDs" -> rddList) + rddJsonList } def render(request: HttpServletRequest): Seq[Node] = { From 72c06445211579a207b36de2d4072974fe7325de Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 12 Sep 2014 12:02:19 +0900 Subject: [PATCH 19/19] Modified variable name in StagePage.scala --- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 59671cf47394f..8357ed79ee7f2 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -79,7 +79,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { stageInfoJson = ("Stage Summary" -> stageSummary) ~ ("Tasks" -> taskList) } - retVal + stageInfoJson } def render(request: HttpServletRequest): Seq[Node] = {