From c0aca09d676ce750496451f3691c5f9e861103bd Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 19 Oct 2014 23:02:29 -0700 Subject: [PATCH 01/13] Add WebUITableBuilder to clean up table building code. This significantly simplifies / abstracts the web UI's table construction code, which seems to account for the majority of the UI code. I haven't converted all tables to use this yet; this commit just provides the basic framework and a few example usages in the master web UI. --- .../spark/deploy/master/ui/MasterPage.scala | 132 +++++----- .../scala/org/apache/spark/ui/UITables.scala | 232 ++++++++++++++++++ 2 files changed, 291 insertions(+), 73 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/ui/UITables.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index 7ca3b08a28728..e59470f32edc4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -20,15 +20,15 @@ package org.apache.spark.deploy.master.ui import javax.servlet.http.HttpServletRequest import scala.concurrent.Await -import scala.xml.Node +import scala.xml.{Text, Node} import akka.pattern.ask import org.json4s.JValue import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} -import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} -import org.apache.spark.ui.{WebUIPage, UIUtils} +import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo, DriverInfo} +import org.apache.spark.ui.{UITable, UITableBuilder, WebUIPage, UIUtils} import org.apache.spark.util.Utils private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { @@ -41,32 +41,71 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { JsonProtocol.writeMasterState(state) } + private val workerTable: UITable[WorkerInfo] = { + val builder = new UITableBuilder[WorkerInfo]() + import builder._ + customCol("ID") { worker => + {worker.id} + } + col("Address") { worker => s"${worker.host}:${worker.port}"} + col("State") { _.state.toString } + intCol("Cores", formatter = c => s"$c Used") { _.coresUsed } + customCol("Memory", + sortKey = Some({worker: WorkerInfo => s"${worker.memory}:${worker.memoryUsed}"})) { worker => + Text(Utils.megabytesToString(worker.memory)) ++ + Text(Utils.megabytesToString(worker.memoryUsed)) + } + build + } + + private val appTable: UITable[ApplicationInfo] = { + val builder = new UITableBuilder[ApplicationInfo]() + import builder._ + customCol("ID") { app => + {app.id} + } + col("Name") { _.id } + intCol("Cores") { _.coresGranted } + memCol("Memory per Node") { _.desc.memoryPerSlave } + dateCol("Submitted Time") { _.submitDate } + col("User") { _.desc.user } + col("State") { _.state.toString } + durationCol("Duration") { _.duration } + build + } + + private val driverTable: UITable[DriverInfo] = { + val builder = new UITableBuilder[DriverInfo]() + import builder._ + col("ID") { _.id } + dateCol("Submitted Time") { _.submitDate } + customCol("Worker") { driver => + driver.worker.map(w => {w.id.toString}).getOrElse(Text("None")) + } + col("State") { _.state.toString } + intCol("Cores") { _.desc.cores } + memCol("Memory") { _.desc.mem.toLong } + col("Main Class") { _.desc.command.arguments(1) } + build + } + /** Index view listing applications and executors */ def render(request: HttpServletRequest): Seq[Node] = { val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, timeout) - val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory") - val workers = state.workers.sortBy(_.id) - val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers) + val allWorkersTable = workerTable.render(state.workers.sortBy(_.id)) - val appHeaders = Seq("ID", "Name", "Cores", "Memory per Node", "Submitted Time", "User", - "State", "Duration") - val activeApps = state.activeApps.sortBy(_.startTime).reverse - val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps) - val completedApps = state.completedApps.sortBy(_.endTime).reverse - val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps) + val activeAppsTable = appTable.render(state.activeApps.sortBy(_.startTime).reverse) + val completedAppsTable = appTable.render(state.completedApps.sortBy(_.endTime).reverse) - val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Cores", "Memory", - "Main Class") - val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse - val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers) - val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse - val completedDriversTable = UIUtils.listingTable(driverHeaders, driverRow, completedDrivers) + val activeDriversTable = driverTable.render(state.activeDrivers.sortBy(_.startTime).reverse) + val completedDriversTable = + driverTable.render(state.completedDrivers.sortBy(_.startTime).reverse) // For now we only show driver information if the user has submitted drivers to the cluster. // This is until we integrate the notion of drivers and applications in the UI. - def hasDrivers = activeDrivers.length > 0 || completedDrivers.length > 0 + def hasDrivers = state.activeDrivers.length > 0 || state.completedDrivers.length > 0 val content =
@@ -93,7 +132,7 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {

Workers

- {workerTable} + {allWorkersTable}
@@ -138,57 +177,4 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { UIUtils.basicSparkPage(content, "Spark Master at " + state.uri) } - - private def workerRow(worker: WorkerInfo): Seq[Node] = { - - - {worker.id} - - {worker.host}:{worker.port} - {worker.state} - {worker.cores} ({worker.coresUsed} Used) - - {Utils.megabytesToString(worker.memory)} - ({Utils.megabytesToString(worker.memoryUsed)} Used) - - - } - - private def appRow(app: ApplicationInfo): Seq[Node] = { - - - {app.id} - - - {app.desc.name} - - - {app.coresGranted} - - - {Utils.megabytesToString(app.desc.memoryPerSlave)} - - {UIUtils.formatDate(app.submitDate)} - {app.desc.user} - {app.state.toString} - {UIUtils.formatDuration(app.duration)} - - } - - private def driverRow(driver: DriverInfo): Seq[Node] = { - - {driver.id} - {driver.submitDate} - {driver.worker.map(w => {w.id.toString}).getOrElse("None")} - - {driver.state} - - {driver.desc.cores} - - - {Utils.megabytesToString(driver.desc.mem.toLong)} - - {driver.desc.command.arguments(1)} - - } } diff --git a/core/src/main/scala/org/apache/spark/ui/UITables.scala b/core/src/main/scala/org/apache/spark/ui/UITables.scala new file mode 100644 index 0000000000000..9e494b10a994c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/UITables.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui + +import java.util.Date + +import scala.collection.mutable +import scala.xml.{Text, Node} + +import org.apache.spark.util.Utils + + +/** + * Describes how to render a column of values in a web UI table. + * + * @param name the name / title of this column + * @param formatter function that formats values for display in the table + * @param sortable if false, this column will not be sortable + * @param sortKey optional function for sorting by a key other than `formatter(value)` + * @param fieldExtractor function for extracting this field's value from the table's row data type + * @tparam T the table's row data type + * @tparam V this column's value type + */ +private case class UITableColumn[T, V]( + name: String, + formatter: V => String, + sortable: Boolean, + sortKey: Option[V => String], + fieldExtractor: T => V) { + + /** Render the TD tag for this row */ + def renderCell(row: T): Seq[Node] = { + val data = fieldExtractor(row) + val cellContents = renderCellContents(data) + Text(k(data)))}> + {cellContents} + + } + + /** Render the contents of the TD tag for this row. The contents may be a string or HTML */ + def renderCellContents(data: V): Seq[Node] = { + Text(formatter(data)) + } +} + +/** + * Describes how to render a table to display rows of type `T`. + * @param cols a sequence of UITableColumns that describe how each column should be rendered + * @param fixedWidth if true, all columns of this table will be displayed with the same width + * @tparam T the row data type + */ +private[spark] class UITable[T] (cols: Seq[UITableColumn[T, _]], fixedWidth: Boolean) { + + private val tableClass = if (fixedWidth) { + UIUtils.TABLE_CLASS + " table-fixed" + } else { + UIUtils.TABLE_CLASS + } + + private val colWidthAttr = if (fixedWidth) Some(Text((100.toDouble / cols.size) + "%")) else None + + private val headerRow: Seq[Node] = { + val headers = cols.map(_.name) + // if none of the headers have "\n" in them + if (headers.forall(!_.contains("\n"))) { + // represent header as simple text + headers.map(h => {h}) + } else { + // represent header text as list while respecting "\n" + headers.map { case h => + + + + } + } + } + + private def renderRow(row: T): Seq[Node] = { + val tds = cols.map(_.renderCell(row)) + { tds } + } + + /** Render the table with the given data */ + def render(data: Seq[T]): Seq[Node] = { + val rows = data.map(renderRow) + + {headerRow} + + {rows} + +
+ } +} + +/** + * Builder for constructing web UI tables. This builder offers several advantages over constructing + * tables by hand using raw XML: + * + * - All of the table's data and formatting logic can live in one place; the table headers and + * rows aren't described in separate code. This prevents several common errors, like changing + * the ordering of two column headers but forgetting to re-order the corresponding TD tags. + * + * - No repetition of code for type-specific display rules: common column types like "memory", + * "duration", and "time" have convenience methods that implement the right formatting logic. + * + * - Details of our specific markup are generally abstracted away. For example, the markup for + * setting a custom sort key on a column now lives in one place, rather than being repeated + * in each table. + * + * The recommended way of using this class: + * + * - Create a new builder that is parametrized by the type (`T`) of data that you want to render. + * In many cases, there may be some record type like `WorkerInfo` that holds all of the + * information needed to render a particular row. If the data for each table row comes from + * several objects, you can combine those objects into a tuple or case-class. + * + * - Use the `col` methods to add columns to this builder. The final argument of each `col` method + * is a function that extracts the column's field from a row object of type `T`. Columns are + * displayed in the order that they are added to the builder. For most columns, you can write + * code like + * + * builder.col("Id") { _.id } + * builder.memCol("Memory" { _.memory } + * + * Columns have additional options, such as controlling their sort keys; see the individual + * methods' documentation for more details. + * + * - Call `build` to construct an immutable object which can be used to render tables. + * * + * To remove some of the boilerplate here, you can statically import the `col` methods; for example: + * + * val myTable: UITable[MyRowDataType] = { + * val builder = new UITableBuilder[MyRowDataType]() + * import builder._ + * col("Name") { _.name } + * [...] + * build + * } + * + * There are many other features, including support for arbitrary markup in custom column types; + * see the actual uses in the web UI code for more details. + * + * @param fixedWidth if true, all columns will be rendered with the same width + * @tparam T the type of the data items that will be used to render individual rows + */ +private[spark] class UITableBuilder[T](fixedWidth: Boolean = false) { + private val cols = mutable.Buffer[UITableColumn[T, _]]() + + /** + * Display a column with custom HTML markup. The markup should only describe how to + * render the contents of the TD tag, not the TD tag itself. + */ + def customCol[V]( + name: String, + sortable: Boolean = true, + sortKey: Option[T => String] = None)(renderer: T => Seq[Node]): UITableBuilder[T] = { + val customColumn = new UITableColumn[T, T](name, null, sortable, sortKey, identity) { + override def renderCellContents(row: T) = renderer(row) + } + cols.append(customColumn) + this + } + + def col[V]( + name: String, + formatter: V => String, + sortable: Boolean = true, + sortKey: Option[V => String] = None)(fieldExtractor: T => V): UITableBuilder[T] = { + cols.append(UITableColumn(name, formatter, sortable, sortKey, fieldExtractor)) + this + } + + def col( + name: String, + sortable: Boolean = true, + sortKey: Option[String => String] = None)(fieldExtractor: T => String): UITableBuilder[T] = { + col[String](name, {x: String => x}, sortable, sortKey)(fieldExtractor) + } + + def intCol( + name: String, + formatter: Int => String = { x: Int => x.toString }, + sortable: Boolean = true)(fieldExtractor: T => Int): UITableBuilder[T] = { + col[Int](name, formatter, sortable = sortable)(fieldExtractor) + } + + /** + * Display a column of memory sizes, in megabytes, as human-readable strings, such as "4.0 MB". + */ + def memCol(name: String)(fieldExtractor: T => Long): UITableBuilder[T] = { + col[Long]( + name, + formatter = Utils.megabytesToString, + sortKey = Some(x => x.toString))(fieldExtractor) + } + + /** + * Display a column of dates as yyyy/MM/dd HH:mm:ss format. + */ + def dateCol(name: String)(fieldExtractor: T => Date): UITableBuilder[T] = { + col[Date](name, formatter = UIUtils.formatDate)(fieldExtractor) + } + + /** + * Display a column of durations, in milliseconds, as human-readable strings, such as "12 s". + */ + def durationCol(name: String)(fieldExtractor: T => Long): UITableBuilder[T] = { + col[Long](name, formatter = UIUtils.formatDuration)(fieldExtractor) + } + + def build: UITable[T] = { + val immutableCols: Seq[UITableColumn[T, _]] = cols.toSeq + new UITable[T](immutableCols, fixedWidth) + } +} \ No newline at end of file From c446116ead172e8052f1cc4e748ac21ec0dfc136 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 20 Oct 2014 20:07:55 -0700 Subject: [PATCH 02/13] Change master web UI application page to use new table builder. --- .../deploy/master/ui/ApplicationPage.scala | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 4588c130ef439..62d151b8ec013 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -28,7 +28,7 @@ import org.json4s.JValue import org.apache.spark.deploy.{ExecutorState, JsonProtocol} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.ExecutorInfo -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.ui.{UITable, UITableBuilder, UIUtils, WebUIPage} import org.apache.spark.util.Utils private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") { @@ -47,6 +47,25 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app JsonProtocol.writeApplicationInfo(app) } + private val executorsTable: UITable[ExecutorInfo] = { + val builder = new UITableBuilder[ExecutorInfo]() + import builder._ + col("ExecutorID") { _.id.toString } + customCol("Worker") { executor => + {executor.worker.id} + } + intCol("Cores") { _.cores } + memCol("Memory") { _.memory } + col("State") { _.state.toString } + customCol("Logs") { executor => + stdout + stderr + } + build + } + /** Executor details for a particular application */ def render(request: HttpServletRequest): Seq[Node] = { val appId = request.getParameter("appId") @@ -56,15 +75,14 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app state.completedApps.find(_.id == appId).getOrElse(null) }) - val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs") val allExecutors = (app.executors.values ++ app.removedExecutors).toSet.toSeq // This includes executors that are either still running or have exited cleanly val executors = allExecutors.filter { exec => !ExecutorState.isFinished(exec.state) || exec.state == ExecutorState.EXITED } val removedExecutors = allExecutors.diff(executors) - val executorsTable = UIUtils.listingTable(executorHeaders, executorRow, executors) - val removedExecutorsTable = UIUtils.listingTable(executorHeaders, executorRow, removedExecutors) + val executorsTable = this.executorsTable.render(executors) + val removedExecutorsTable = this.executorsTable.render(removedExecutors) val content =
@@ -108,22 +126,4 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
; UIUtils.basicSparkPage(content, "Application: " + app.desc.name) } - - private def executorRow(executor: ExecutorInfo): Seq[Node] = { - - {executor.id} - - {executor.worker.id} - - {executor.cores} - {executor.memory} - {executor.state} - - stdout - stderr - - - } } From 49e0e97c6b22a38102eaaa782a799478ded6b611 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 20 Oct 2014 20:16:09 -0700 Subject: [PATCH 03/13] Convert master app history page to use new table builder. --- .../spark/deploy/history/HistoryPage.scala | 46 ++++++++----------- .../scala/org/apache/spark/ui/UITables.scala | 9 +++- 2 files changed, 26 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index d25c29113d6da..255d3f076f1a3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -21,12 +21,28 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{WebUIPage, UIUtils} +import org.apache.spark.ui.{UITableBuilder, UITable, WebUIPage, UIUtils} private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { private val pageSize = 20 + val appTable: UITable[ApplicationHistoryInfo] = { + val builder = new UITableBuilder[ApplicationHistoryInfo]() + import builder._ + customCol("App ID") { info => + val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" + {info.id} + } + col("App Name") { _.name } + epochDateCol("Started") { _.startTime } + epochDateCol("Completed") { _.endTime } + durationCol("Duration") { info => info.endTime - info.startTime } + col("Spark User") { _.sparkUser } + epochDateCol("Last Updated") { _.lastUpdated } + build + } + def render(request: HttpServletRequest): Seq[Node] = { val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt val requestedFirst = (requestedPage - 1) * pageSize @@ -39,7 +55,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { val last = Math.min(actualFirst + pageSize, allApps.size) - 1 val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0) - val appTable = UIUtils.listingTable(appHeader, appRow, apps) + val appTable = this.appTable.render(apps) val providerConfig = parent.getProviderConfig() val content =
@@ -65,30 +81,4 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
UIUtils.basicSparkPage(content, "History Server") } - - private val appHeader = Seq( - "App ID", - "App Name", - "Started", - "Completed", - "Duration", - "Spark User", - "Last Updated") - - private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { - val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" - val startTime = UIUtils.formatDate(info.startTime) - val endTime = UIUtils.formatDate(info.endTime) - val duration = UIUtils.formatDuration(info.endTime - info.startTime) - val lastUpdated = UIUtils.formatDate(info.lastUpdated) - - {info.id} - {info.name} - {startTime} - {endTime} - {duration} - {info.sparkUser} - {lastUpdated} - - } } diff --git a/core/src/main/scala/org/apache/spark/ui/UITables.scala b/core/src/main/scala/org/apache/spark/ui/UITables.scala index 9e494b10a994c..a7333d5f97bd1 100644 --- a/core/src/main/scala/org/apache/spark/ui/UITables.scala +++ b/core/src/main/scala/org/apache/spark/ui/UITables.scala @@ -98,7 +98,7 @@ private[spark] class UITable[T] (cols: Seq[UITableColumn[T, _]], fixedWidth: Boo } /** Render the table with the given data */ - def render(data: Seq[T]): Seq[Node] = { + def render(data: Iterable[T]): Seq[Node] = { val rows = data.map(renderRow) {headerRow} @@ -218,6 +218,13 @@ private[spark] class UITableBuilder[T](fixedWidth: Boolean = false) { col[Date](name, formatter = UIUtils.formatDate)(fieldExtractor) } + /** + * Display a column of dates as yyyy/MM/dd HH:mm:ss format. + */ + def epochDateCol(name: String)(fieldExtractor: T => Long): UITableBuilder[T] = { + col[Long](name, formatter = UIUtils.formatDate)(fieldExtractor) + } + /** * Display a column of durations, in milliseconds, as human-readable strings, such as "12 s". */ From dec3cbcd82fd7b27b1a555fc083df98aee261252 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 20 Oct 2014 20:27:04 -0700 Subject: [PATCH 04/13] Convert storage page to use new table builder --- .../apache/spark/ui/storage/StoragePage.scala | 55 +++++++------------ 1 file changed, 21 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala index 83489ca0679ee..5da297b6bd656 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala @@ -22,46 +22,33 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.storage.RDDInfo -import org.apache.spark.ui.{WebUIPage, UIUtils} -import org.apache.spark.util.Utils +import org.apache.spark.ui.{UITableBuilder, UITable, WebUIPage, UIUtils} /** Page showing list of RDD's currently stored in the cluster */ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { private val listener = parent.listener - def render(request: HttpServletRequest): Seq[Node] = { - val rdds = listener.rddInfoList - val content = UIUtils.listingTable(rddHeader, rddRow, rdds) - UIUtils.headerSparkPage("Storage", content, parent) + val rddTable: UITable[RDDInfo] = { + val builder = new UITableBuilder[RDDInfo]() + import builder._ + customCol("RDD Name") { rdd => + + {rdd.name} + + } + col("Storage Level") { _.storageLevel.description } + intCol("Cached Partitions") { _.numCachedPartitions } + col("Fraction Cached") { rdd => + "%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions) + } + memCol("Size in Memory") { _.memSize } + memCol("Size in Tachyon") { _.tachyonSize } + memCol("Size on Disk") { _.diskSize } + build } - /** Header fields for the RDD table */ - private def rddHeader = Seq( - "RDD Name", - "Storage Level", - "Cached Partitions", - "Fraction Cached", - "Size in Memory", - "Size in Tachyon", - "Size on Disk") - - /** Render an HTML row representing an RDD */ - private def rddRow(rdd: RDDInfo): Seq[Node] = { - // scalastyle:off - - - - - - - - - - // scalastyle:on + def render(request: HttpServletRequest): Seq[Node] = { + val content = rddTable.render(listener.rddInfoList) + UIUtils.headerSparkPage("Storage", content, parent) } } From e8382a3b9334445129c39be367f2abd0ed7c91f7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 20 Oct 2014 23:12:59 -0700 Subject: [PATCH 05/13] Convert RDD storage page to use new table builder --- .../org/apache/spark/ui/storage/RDDPage.scala | 86 ++++++++----------- 1 file changed, 35 insertions(+), 51 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 8a0075ae8daf7..50dd1c8b98b1b 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -19,16 +19,47 @@ package org.apache.spark.ui.storage import javax.servlet.http.HttpServletRequest -import scala.xml.Node +import scala.xml.{Text, Node} import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils} -import org.apache.spark.ui.{WebUIPage, UIUtils} +import org.apache.spark.ui.{UITableBuilder, UITable, WebUIPage, UIUtils} import org.apache.spark.util.Utils /** Page showing storage details for a given RDD */ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { private val listener = parent.listener + private val workerTable: UITable[(Int, StorageStatus)] = { + val builder = new UITableBuilder[(Int, StorageStatus)]() + import builder._ + col("Host") { case (_, status) => + s"${status.blockManagerId.host}:${status.blockManagerId.port}" + } + def getMemUsed(x: (Int, StorageStatus)): String = x._2.memUsedByRdd(x._1).toString + customCol( + "Memory usage", + sortKey = Some(getMemUsed)) { case (rddId, status) => + val used = Utils.bytesToString(status.memUsedByRdd(rddId)) + val remaining = Utils.bytesToString(status.memRemaining) + Text(s"$used ($remaining Remaining)") + } + memCol("Disk Usage") { case (rddId, status) => status.diskUsedByRdd(rddId) } + build + } + + val blockTable: UITable[(BlockId, BlockStatus, Seq[String])] = { + val builder = new UITableBuilder[(BlockId, BlockStatus, Seq[String])]() + import builder._ + col("Block Name") { case (id, block, locations) => id.toString } + col("Storage Level") { case (id, block, locations) => block.storageLevel.description } + memCol("Size in Memory") { case (id, block, locations) => block.memSize } + memCol("Size on Disk") { case (id, block, locations) => block.diskSize } + customCol("Executors") { case (id, block, locations) => + locations.map(l => {l}
) + } + build + } + def render(request: HttpServletRequest): Seq[Node] = { val rddId = request.getParameter("id").toInt val storageStatusList = listener.storageStatusList @@ -39,7 +70,7 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { // Worker table val workers = storageStatusList.map((rddId, _)) - val workerTable = UIUtils.listingTable(workerHeader, workerRow, workers) + val workerTable = this.workerTable.render(workers) // Block table val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList) @@ -49,7 +80,7 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { .map { case (blockId, status) => (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown"))) } - val blockTable = UIUtils.listingTable(blockHeader, blockRow, blocks) + val blockTable = this.blockTable.render(blocks) val content =
@@ -95,51 +126,4 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { UIUtils.headerSparkPage("RDD Storage Info for " + rddInfo.name, content, parent) } - - /** Header fields for the worker table */ - private def workerHeader = Seq( - "Host", - "Memory Usage", - "Disk Usage") - - /** Header fields for the block table */ - private def blockHeader = Seq( - "Block Name", - "Storage Level", - "Size in Memory", - "Size on Disk", - "Executors") - - /** Render an HTML row representing a worker */ - private def workerRow(worker: (Int, StorageStatus)): Seq[Node] = { - val (rddId, status) = worker -
- - - - - } - - /** Render an HTML row representing a block */ - private def blockRow(row: (BlockId, BlockStatus, Seq[String])): Seq[Node] = { - val (id, block, locations) = row - - - - - - - - } } From 8f3839a00a6eeadf117bf5a0c7bc531adfce523a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 20 Oct 2014 23:37:10 -0700 Subject: [PATCH 06/13] Convert environment page tables to use new builder --- .../apache/spark/ui/env/EnvironmentPage.scala | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala index f0a1174a71d34..a3583d106a335 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala @@ -21,34 +21,31 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.ui.{UITableBuilder, UITable, UIUtils, WebUIPage} private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") { private val listener = parent.listener + private def stringPairTable(col1Name: String, col2Name: String): UITable[(Any, Any)] = { + val builder = new UITableBuilder[(Any, Any)](fixedWidth = true) + import builder._ + col(col1Name) { _._1.toString } + col(col2Name) { _._2.toString } + build + } + + private val propertyTable = stringPairTable("Name", "Value") + private val classpathTable = stringPairTable("Resource", "Source") + def render(request: HttpServletRequest): Seq[Node] = { - val runtimeInformationTable = UIUtils.listingTable( - propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true) - val sparkPropertiesTable = UIUtils.listingTable( - propertyHeader, propertyRow, listener.sparkProperties, fixedWidth = true) - val systemPropertiesTable = UIUtils.listingTable( - propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true) - val classpathEntriesTable = UIUtils.listingTable( - classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true) val content = -

Runtime Information

{runtimeInformationTable} -

Spark Properties

{sparkPropertiesTable} -

System Properties

{systemPropertiesTable} -

Classpath Entries

{classpathEntriesTable} +

Runtime Information

{propertyTable.render(listener.jvmInformation)} +

Spark Properties

{propertyTable.render(listener.sparkProperties)} +

System Properties

{propertyTable.render(listener.systemProperties)} +

Classpath Entries

{classpathTable.render(listener.classpathEntries)}
UIUtils.headerSparkPage("Environment", content, parent) } - - private def propertyHeader = Seq("Name", "Value") - private def classPathHeaders = Seq("Resource", "Source") - private def jvmRow(kv: (String, String)) =
- private def propertyRow(kv: (String, String)) = - private def classPathRow(data: (String, String)) = } From 2183e70e4f246630ff5bd9a67b46fb1cbcdd98e4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 20 Oct 2014 23:47:10 -0700 Subject: [PATCH 07/13] Convert worker page tables to use new builder --- .../spark/deploy/worker/ui/WorkerPage.scala | 99 +++++++++---------- 1 file changed, 44 insertions(+), 55 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala index 327b905032800..ab803e83da52d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala @@ -28,7 +28,7 @@ import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} -import org.apache.spark.ui.{WebUIPage, UIUtils} +import org.apache.spark.ui.{UITable, UITableBuilder, WebUIPage, UIUtils} import org.apache.spark.util.Utils private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { @@ -42,23 +42,58 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { JsonProtocol.writeWorkerState(workerState) } + private val executorTable: UITable[ExecutorRunner] = { + val builder = new UITableBuilder[ExecutorRunner]() + import builder._ + intCol("Executor ID") { _.execId } + intCol("Cores") { _.cores } + col("State") { _.state.toString } + memCol("Memory") { _.memory } + customCol("Job Details") { executor => +
    +
  • ID: {executor.appId}
  • +
  • Name: {executor.appDesc.name}
  • +
  • User: {executor.appDesc.user}
  • +
+ } + customCol("Logs") { executor => + stdout + stderr + } + build + } + + private val driverTable: UITable[DriverRunner] = { + val builder = new UITableBuilder[DriverRunner]() + import builder._ + col("Driver ID") { _.driverId } + col("Main Class") { _.driverDesc.command.arguments(1) } + col("State") { _.finalState.getOrElse(DriverState.RUNNING).toString } + intCol("Cores") { _.driverDesc.cores } + memCol("Memory") { _.driverDesc.mem } + customCol("Logs") { driver => + stdout + stderr + } + col("Notes") { _.finalException.getOrElse("").toString } + build + } + def render(request: HttpServletRequest): Seq[Node] = { val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] val workerState = Await.result(stateFuture, timeout) - val executorHeaders = Seq("ExecutorID", "Cores", "State", "Memory", "Job Details", "Logs") val runningExecutors = workerState.executors - val runningExecutorTable = - UIUtils.listingTable(executorHeaders, executorRow, runningExecutors) + val runningExecutorTable = executorTable.render(runningExecutors) val finishedExecutors = workerState.finishedExecutors - val finishedExecutorTable = - UIUtils.listingTable(executorHeaders, executorRow, finishedExecutors) + val finishedExecutorTable = executorTable.render(finishedExecutors) - val driverHeaders = Seq("DriverID", "Main Class", "State", "Cores", "Memory", "Logs", "Notes") val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse - val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers) + val runningDriverTable = driverTable.render(runningDrivers) val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse - val finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers) + val finishedDriverTable = driverTable.render(finishedDrivers) // For now we only show driver information if the user has submitted drivers to the cluster. // This is until we integrate the notion of drivers and applications in the UI. @@ -105,50 +140,4 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format( workerState.host, workerState.port)) } - - def executorRow(executor: ExecutorRunner): Seq[Node] = { -
- - - - - - - - - } - - def driverRow(driver: DriverRunner): Seq[Node] = { - - - - - - - - - - } } From 290f58b47fd1a77b61f8ead753db534098224dbc Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 Oct 2014 00:38:21 -0700 Subject: [PATCH 08/13] Convert streaming ui to use new table builder. --- .../scala/org/apache/spark/ui/UITables.scala | 14 +- .../spark/streaming/ui/StreamingPage.scala | 143 ++++++++---------- 2 files changed, 78 insertions(+), 79 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/UITables.scala b/core/src/main/scala/org/apache/spark/ui/UITables.scala index a7333d5f97bd1..f999a47c0f212 100644 --- a/core/src/main/scala/org/apache/spark/ui/UITables.scala +++ b/core/src/main/scala/org/apache/spark/ui/UITables.scala @@ -229,7 +229,19 @@ private[spark] class UITableBuilder[T](fixedWidth: Boolean = false) { * Display a column of durations, in milliseconds, as human-readable strings, such as "12 s". */ def durationCol(name: String)(fieldExtractor: T => Long): UITableBuilder[T] = { - col[Long](name, formatter = UIUtils.formatDuration)(fieldExtractor) + col[Long](name, formatter = UIUtils.formatDuration, sortKey = Some(_.toString))(fieldExtractor) + } + + /** + * Display a column of optional durations, in milliseconds, as human-readable strings, + * such as "12 s". If the duration is None, then '-' will be displayed. + */ + def optDurationCol(name: String)(fieldExtractor: T => Option[Long]): UITableBuilder[T] = { + col[Option[Long]]( + name, + formatter = { _.map(UIUtils.formatDuration).getOrElse("-")}, + sortKey = Some(_.getOrElse("-").toString) + )(fieldExtractor) } def build: UITable[T] = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 1353e487c72cf..e773b5a41c707 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -25,7 +25,8 @@ import scala.xml.Node import org.apache.spark.Logging import org.apache.spark.ui._ import org.apache.spark.ui.UIUtils._ -import org.apache.spark.util.Distribution +import org.apache.spark.streaming.scheduler.ReceiverInfo + /** Page for Spark Web UI that shows statistics of a streaming job */ private[ui] class StreamingPage(parent: StreamingTab) @@ -33,7 +34,7 @@ private[ui] class StreamingPage(parent: StreamingTab) private val listener = parent.listener private val startTime = Calendar.getInstance().getTime() - private val emptyCell = "-" + private val empty = "-" /** Render the page */ def render(request: HttpServletRequest): Seq[Node] = { @@ -45,6 +46,19 @@ private[ui] class StreamingPage(parent: StreamingTab) UIUtils.headerSparkPage("Streaming", content, parent, Some(5000)) } + private val batchStatsTable: UITable[(String, Option[Long], Option[Seq[Double]])] = { + val builder = new UITableBuilder[(String, Option[Long], Option[Seq[Double]])]() + import builder._ + col("Metric") { _._1 } + optDurationCol("Last batch") { _._2 } + optDurationCol("Minimum") { _._3.map(_(0).toLong) } + optDurationCol("25th percentile") { _._3.map(_(1).toLong) } + optDurationCol("Median") { _._3.map(_(2).toLong) } + optDurationCol("75th percentile") { _._3.map(_(3).toLong) } + optDurationCol("Maximum") { _._3.map(_(4).toLong) } + build + } + /** Generate basic stats of the streaming program */ private def generateBasicStats(): Seq[Node] = { val timeSinceStart = System.currentTimeMillis() - startTime.getTime @@ -75,37 +89,45 @@ private[ui] class StreamingPage(parent: StreamingTab) val receivedRecordDistributions = listener.receivedRecordsDistributions val lastBatchReceivedRecord = listener.lastReceivedBatchRecords val table = if (receivedRecordDistributions.size > 0) { - val headerRow = Seq( - "Receiver", - "Status", - "Location", - "Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]", - "Minimum rate\n[records/sec]", - "Median rate\n[records/sec]", - "Maximum rate\n[records/sec]", - "Last Error" - ) - val dataRows = (0 until listener.numReceivers).map { receiverId => - val receiverInfo = listener.receiverInfo(receiverId) - val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId") - val receiverActive = receiverInfo.map { info => - if (info.active) "ACTIVE" else "INACTIVE" - }.getOrElse(emptyCell) - val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell) - val receiverLastBatchRecords = formatNumber(lastBatchReceivedRecord(receiverId)) - val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => - d.getQuantiles(Seq(0.0, 0.5, 1.0)).map(r => formatNumber(r.toLong)) - }.getOrElse { - Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell) + val tableRenderer: UITable[(Int, Option[ReceiverInfo])] = { + val builder = new UITableBuilder[(Int, Option[ReceiverInfo])]() + import builder._ + col("Receiver") { case (receiverId, receiverInfo) => + receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId") + } + col("Status") { case (_, receiverInfo) => + receiverInfo.map { info => if (info.active) "ACTIVE" else "INACTIVE" }.getOrElse(empty) + } + col("Location") { case (_, receiverInfo) => receiverInfo.map(_.location).getOrElse(empty) } + col("Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]") { + case (receiverId, _) => formatNumber(lastBatchReceivedRecord(receiverId)) } - val receiverLastError = listener.receiverInfo(receiverId).map { info => - val msg = s"${info.lastErrorMessage} - ${info.lastError}" - if (msg.size > 100) msg.take(97) + "..." else msg - }.getOrElse(emptyCell) - Seq(receiverName, receiverActive, receiverLocation, receiverLastBatchRecords) ++ - receivedRecordStats ++ Seq(receiverLastError) + col("Minimum rate\n[records/sec]") { + case (receiverId, _) => receivedRecordDistributions(receiverId).map { + _.getQuantiles(Seq(0.0)).map(formatNumber).head + }.getOrElse(empty) + } + col("Median rate\n[records/sec]") { + case (receiverId, _) => receivedRecordDistributions(receiverId).map { + _.getQuantiles(Seq(0.5)).map(formatNumber).head + }.getOrElse(empty) + } + col("Maximum rate\n[records/sec]") { + case (receiverId, _) => receivedRecordDistributions(receiverId).map { + _.getQuantiles(Seq(1.0)).map(formatNumber).head + }.getOrElse(empty) + } + col("Last Error") { + case (_, receiverInfo) => receiverInfo.map { info => + val msg = s"${info.lastErrorMessage} - ${info.lastError}" + if (msg.size > 100) msg.take(97) + "..." else msg + }.getOrElse(empty) + } + build } - Some(listingTable(headerRow, dataRows)) + + val dataRows = (0 until listener.numReceivers).map { id => (id, listener.receiverInfo(id)) } + Some(tableRenderer.render(dataRows)) } else { None } @@ -121,33 +143,20 @@ private[ui] class StreamingPage(parent: StreamingTab) private def generateBatchStatsTable(): Seq[Node] = { val numBatches = listener.retainedCompletedBatches.size val lastCompletedBatch = listener.lastCompletedBatch + val table = if (numBatches > 0) { - val processingDelayQuantilesRow = { - Seq( - "Processing Time", - formatDurationOption(lastCompletedBatch.flatMap(_.processingDelay)) - ) ++ getQuantiles(listener.processingDelayDistribution) - } - val schedulingDelayQuantilesRow = { - Seq( - "Scheduling Delay", - formatDurationOption(lastCompletedBatch.flatMap(_.schedulingDelay)) - ) ++ getQuantiles(listener.schedulingDelayDistribution) - } - val totalDelayQuantilesRow = { - Seq( - "Total Delay", - formatDurationOption(lastCompletedBatch.flatMap(_.totalDelay)) - ) ++ getQuantiles(listener.totalDelayDistribution) - } - val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile", - "Median", "75th percentile", "Maximum") - val dataRows: Seq[Seq[String]] = Seq( - processingDelayQuantilesRow, - schedulingDelayQuantilesRow, - totalDelayQuantilesRow + val rows: Seq[(String, Option[Long], Option[Seq[Double]])] = Seq( + ("Processing Time", + lastCompletedBatch.flatMap(_.processingDelay), + listener.processingDelayDistribution.map(_.getQuantiles())), + ("Scheduling Delay", + lastCompletedBatch.flatMap(_.schedulingDelay), + listener.schedulingDelayDistribution.map(_.getQuantiles())), + ("Total Delay", + lastCompletedBatch.flatMap(_.totalDelay), + listener.totalDelayDistribution.map(_.getQuantiles())) ) - Some(listingTable(headerRow, dataRows)) + Some(batchStatsTable.render(rows)) } else { None } @@ -162,26 +171,4 @@ private[ui] class StreamingPage(parent: StreamingTab) content } - - - /** - * Returns a human-readable string representing a duration such as "5 second 35 ms" - */ - private def formatDurationOption(msOption: Option[Long]): String = { - msOption.map(formatDurationVerbose).getOrElse(emptyCell) - } - - /** Get quantiles for any time distribution */ - private def getQuantiles(timeDistributionOption: Option[Distribution]) = { - timeDistributionOption.get.getQuantiles().map { ms => formatDurationVerbose(ms.toLong) } - } - - /** Generate HTML table from string data */ - private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = { - def generateDataRow(data: Seq[String]): Seq[Node] = { - {data.map(d => )} - } - UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true) - } } - From 292a4bdcf9d0d6bad0e06b3576eda6f1ad3b63fd Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 Oct 2014 16:14:14 -0700 Subject: [PATCH 09/13] Don't import builder._, since this may be confusing. Make build() into a paren method. --- .../spark/deploy/history/HistoryPage.scala | 19 +++---- .../deploy/master/ui/ApplicationPage.scala | 17 +++--- .../spark/deploy/master/ui/MasterPage.scala | 55 +++++++++---------- .../spark/deploy/worker/ui/WorkerPage.scala | 36 ++++++------ .../scala/org/apache/spark/ui/UITables.scala | 4 +- .../apache/spark/ui/env/EnvironmentPage.scala | 9 ++- .../org/apache/spark/ui/storage/RDDPage.scala | 26 ++++----- .../apache/spark/ui/storage/StoragePage.scala | 19 +++---- .../spark/streaming/ui/StreamingPage.scala | 40 +++++++------- 9 files changed, 106 insertions(+), 119 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 255d3f076f1a3..adaf6cf12a362 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -28,19 +28,18 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { private val pageSize = 20 val appTable: UITable[ApplicationHistoryInfo] = { - val builder = new UITableBuilder[ApplicationHistoryInfo]() - import builder._ - customCol("App ID") { info => + val t = new UITableBuilder[ApplicationHistoryInfo]() + t.customCol("App ID") { info => val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" {info.id} } - col("App Name") { _.name } - epochDateCol("Started") { _.startTime } - epochDateCol("Completed") { _.endTime } - durationCol("Duration") { info => info.endTime - info.startTime } - col("Spark User") { _.sparkUser } - epochDateCol("Last Updated") { _.lastUpdated } - build + t.col("App Name") { _.name } + t.epochDateCol("Started") { _.startTime } + t.epochDateCol("Completed") { _.endTime } + t.durationCol("Duration") { info => info.endTime - info.startTime } + t.col("Spark User") { _.sparkUser } + t.epochDateCol("Last Updated") { _.lastUpdated } + t.build() } def render(request: HttpServletRequest): Seq[Node] = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 62d151b8ec013..8d4ccd8e5ee53 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -48,22 +48,21 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app } private val executorsTable: UITable[ExecutorInfo] = { - val builder = new UITableBuilder[ExecutorInfo]() - import builder._ - col("ExecutorID") { _.id.toString } - customCol("Worker") { executor => + val t = new UITableBuilder[ExecutorInfo]() + t.col("ExecutorID") { _.id.toString } + t.customCol("Worker") { executor => {executor.worker.id} } - intCol("Cores") { _.cores } - memCol("Memory") { _.memory } - col("State") { _.state.toString } - customCol("Logs") { executor => + t.intCol("Cores") { _.cores } + t.memCol("Memory") { _.memory } + t.col("State") { _.state.toString } + t.customCol("Logs") { executor => stdout stderr } - build + t.build() } /** Executor details for a particular application */ diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index e59470f32edc4..b67a5d0def50a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -42,51 +42,48 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { } private val workerTable: UITable[WorkerInfo] = { - val builder = new UITableBuilder[WorkerInfo]() - import builder._ - customCol("ID") { worker => + val t = new UITableBuilder[WorkerInfo]() + t.customCol("ID") { worker => {worker.id} } - col("Address") { worker => s"${worker.host}:${worker.port}"} - col("State") { _.state.toString } - intCol("Cores", formatter = c => s"$c Used") { _.coresUsed } - customCol("Memory", + t.col("Address") { worker => s"${worker.host}:${worker.port}"} + t.col("State") { _.state.toString } + t.intCol("Cores", formatter = c => s"$c Used") { _.coresUsed } + t.customCol("Memory", sortKey = Some({worker: WorkerInfo => s"${worker.memory}:${worker.memoryUsed}"})) { worker => Text(Utils.megabytesToString(worker.memory)) ++ Text(Utils.megabytesToString(worker.memoryUsed)) } - build + t.build() } private val appTable: UITable[ApplicationInfo] = { - val builder = new UITableBuilder[ApplicationInfo]() - import builder._ - customCol("ID") { app => + val t = new UITableBuilder[ApplicationInfo]() + t.customCol("ID") { app => {app.id} } - col("Name") { _.id } - intCol("Cores") { _.coresGranted } - memCol("Memory per Node") { _.desc.memoryPerSlave } - dateCol("Submitted Time") { _.submitDate } - col("User") { _.desc.user } - col("State") { _.state.toString } - durationCol("Duration") { _.duration } - build + t.col("Name") { _.id } + t.intCol("Cores") { _.coresGranted } + t.memCol("Memory per Node") { _.desc.memoryPerSlave } + t.dateCol("Submitted Time") { _.submitDate } + t.col("User") { _.desc.user } + t.col("State") { _.state.toString } + t.durationCol("Duration") { _.duration } + t.build() } private val driverTable: UITable[DriverInfo] = { - val builder = new UITableBuilder[DriverInfo]() - import builder._ - col("ID") { _.id } - dateCol("Submitted Time") { _.submitDate } - customCol("Worker") { driver => + val t = new UITableBuilder[DriverInfo]() + t.col("ID") { _.id } + t.dateCol("Submitted Time") { _.submitDate } + t.customCol("Worker") { driver => driver.worker.map(w => {w.id.toString}).getOrElse(Text("None")) } - col("State") { _.state.toString } - intCol("Cores") { _.desc.cores } - memCol("Memory") { _.desc.mem.toLong } - col("Main Class") { _.desc.command.arguments(1) } - build + t.col("State") { _.state.toString } + t.intCol("Cores") { _.desc.cores } + t.memCol("Memory") { _.desc.mem.toLong } + t.col("Main Class") { _.desc.command.arguments(1) } + t.build() } /** Index view listing applications and executors */ diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala index ab803e83da52d..064265193a991 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala @@ -43,42 +43,40 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { } private val executorTable: UITable[ExecutorRunner] = { - val builder = new UITableBuilder[ExecutorRunner]() - import builder._ - intCol("Executor ID") { _.execId } - intCol("Cores") { _.cores } - col("State") { _.state.toString } - memCol("Memory") { _.memory } - customCol("Job Details") { executor => + val t = new UITableBuilder[ExecutorRunner]() + t.intCol("Executor ID") { _.execId } + t.intCol("Cores") { _.cores } + t.col("State") { _.state.toString } + t.memCol("Memory") { _.memory } + t.customCol("Job Details") { executor =>
  • ID: {executor.appId}
  • Name: {executor.appDesc.name}
  • User: {executor.appDesc.user}
} - customCol("Logs") { executor => + t.customCol("Logs") { executor => stdout stderr } - build + t.build() } private val driverTable: UITable[DriverRunner] = { - val builder = new UITableBuilder[DriverRunner]() - import builder._ - col("Driver ID") { _.driverId } - col("Main Class") { _.driverDesc.command.arguments(1) } - col("State") { _.finalState.getOrElse(DriverState.RUNNING).toString } - intCol("Cores") { _.driverDesc.cores } - memCol("Memory") { _.driverDesc.mem } - customCol("Logs") { driver => + val t = new UITableBuilder[DriverRunner]() + t.col("Driver ID") { _.driverId } + t.col("Main Class") { _.driverDesc.command.arguments(1) } + t.col("State") { _.finalState.getOrElse(DriverState.RUNNING).toString } + t.intCol("Cores") { _.driverDesc.cores } + t.memCol("Memory") { _.driverDesc.mem } + t.customCol("Logs") { driver => stdout stderr } - col("Notes") { _.finalException.getOrElse("").toString } - build + t.col("Notes") { _.finalException.getOrElse("").toString } + t.build() } def render(request: HttpServletRequest): Seq[Node] = { diff --git a/core/src/main/scala/org/apache/spark/ui/UITables.scala b/core/src/main/scala/org/apache/spark/ui/UITables.scala index f999a47c0f212..96c0279fa942f 100644 --- a/core/src/main/scala/org/apache/spark/ui/UITables.scala +++ b/core/src/main/scala/org/apache/spark/ui/UITables.scala @@ -244,8 +244,8 @@ private[spark] class UITableBuilder[T](fixedWidth: Boolean = false) { )(fieldExtractor) } - def build: UITable[T] = { + def build(): UITable[T] = { val immutableCols: Seq[UITableColumn[T, _]] = cols.toSeq new UITable[T](immutableCols, fixedWidth) } -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala index a3583d106a335..4f9b35a591340 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala @@ -27,11 +27,10 @@ private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") private val listener = parent.listener private def stringPairTable(col1Name: String, col2Name: String): UITable[(Any, Any)] = { - val builder = new UITableBuilder[(Any, Any)](fixedWidth = true) - import builder._ - col(col1Name) { _._1.toString } - col(col2Name) { _._2.toString } - build + val t = new UITableBuilder[(Any, Any)](fixedWidth = true) + t.col(col1Name) { _._1.toString } + t.col(col2Name) { _._2.toString } + t.build() } private val propertyTable = stringPairTable("Name", "Value") diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 50dd1c8b98b1b..365cec554facd 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -30,34 +30,32 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { private val listener = parent.listener private val workerTable: UITable[(Int, StorageStatus)] = { - val builder = new UITableBuilder[(Int, StorageStatus)]() - import builder._ - col("Host") { case (_, status) => + val t = new UITableBuilder[(Int, StorageStatus)]() + t.col("Host") { case (_, status) => s"${status.blockManagerId.host}:${status.blockManagerId.port}" } def getMemUsed(x: (Int, StorageStatus)): String = x._2.memUsedByRdd(x._1).toString - customCol( + t.customCol( "Memory usage", sortKey = Some(getMemUsed)) { case (rddId, status) => val used = Utils.bytesToString(status.memUsedByRdd(rddId)) val remaining = Utils.bytesToString(status.memRemaining) Text(s"$used ($remaining Remaining)") } - memCol("Disk Usage") { case (rddId, status) => status.diskUsedByRdd(rddId) } - build + t.memCol("Disk Usage") { case (rddId, status) => status.diskUsedByRdd(rddId) } + t.build() } val blockTable: UITable[(BlockId, BlockStatus, Seq[String])] = { - val builder = new UITableBuilder[(BlockId, BlockStatus, Seq[String])]() - import builder._ - col("Block Name") { case (id, block, locations) => id.toString } - col("Storage Level") { case (id, block, locations) => block.storageLevel.description } - memCol("Size in Memory") { case (id, block, locations) => block.memSize } - memCol("Size on Disk") { case (id, block, locations) => block.diskSize } - customCol("Executors") { case (id, block, locations) => + val t = new UITableBuilder[(BlockId, BlockStatus, Seq[String])]() + t.col("Block Name") { case (id, block, locations) => id.toString } + t.col("Storage Level") { case (id, block, locations) => block.storageLevel.description } + t. memCol("Size in Memory") { case (id, block, locations) => block.memSize } + t.memCol("Size on Disk") { case (id, block, locations) => block.diskSize } + t.customCol("Executors") { case (id, block, locations) => locations.map(l => {l}
) } - build + t.build() } def render(request: HttpServletRequest): Seq[Node] = { diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala index 5da297b6bd656..deee4a06fbcc6 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala @@ -29,22 +29,21 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { private val listener = parent.listener val rddTable: UITable[RDDInfo] = { - val builder = new UITableBuilder[RDDInfo]() - import builder._ - customCol("RDD Name") { rdd => + val t = new UITableBuilder[RDDInfo]() + t.customCol("RDD Name") { rdd => {rdd.name} } - col("Storage Level") { _.storageLevel.description } - intCol("Cached Partitions") { _.numCachedPartitions } - col("Fraction Cached") { rdd => + t.col("Storage Level") { _.storageLevel.description } + t.intCol("Cached Partitions") { _.numCachedPartitions } + t. col("Fraction Cached") { rdd => "%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions) } - memCol("Size in Memory") { _.memSize } - memCol("Size in Tachyon") { _.tachyonSize } - memCol("Size on Disk") { _.diskSize } - build + t.memCol("Size in Memory") { _.memSize } + t.memCol("Size in Tachyon") { _.tachyonSize } + t.memCol("Size on Disk") { _.diskSize } + t.build() } def render(request: HttpServletRequest): Seq[Node] = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index e773b5a41c707..d40334af1dcd0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -47,16 +47,15 @@ private[ui] class StreamingPage(parent: StreamingTab) } private val batchStatsTable: UITable[(String, Option[Long], Option[Seq[Double]])] = { - val builder = new UITableBuilder[(String, Option[Long], Option[Seq[Double]])]() - import builder._ - col("Metric") { _._1 } - optDurationCol("Last batch") { _._2 } - optDurationCol("Minimum") { _._3.map(_(0).toLong) } - optDurationCol("25th percentile") { _._3.map(_(1).toLong) } - optDurationCol("Median") { _._3.map(_(2).toLong) } - optDurationCol("75th percentile") { _._3.map(_(3).toLong) } - optDurationCol("Maximum") { _._3.map(_(4).toLong) } - build + val t = new UITableBuilder[(String, Option[Long], Option[Seq[Double]])]() + t.col("Metric") { _._1 } + t.optDurationCol("Last batch") { _._2 } + t.optDurationCol("Minimum") { _._3.map(_(0).toLong) } + t.optDurationCol("25th percentile") { _._3.map(_(1).toLong) } + t.optDurationCol("Median") { _._3.map(_(2).toLong) } + t.optDurationCol("75th percentile") { _._3.map(_(3).toLong) } + t.optDurationCol("Maximum") { _._3.map(_(4).toLong) } + t.build() } /** Generate basic stats of the streaming program */ @@ -90,40 +89,39 @@ private[ui] class StreamingPage(parent: StreamingTab) val lastBatchReceivedRecord = listener.lastReceivedBatchRecords val table = if (receivedRecordDistributions.size > 0) { val tableRenderer: UITable[(Int, Option[ReceiverInfo])] = { - val builder = new UITableBuilder[(Int, Option[ReceiverInfo])]() - import builder._ - col("Receiver") { case (receiverId, receiverInfo) => + val t = new UITableBuilder[(Int, Option[ReceiverInfo])]() + t.col("Receiver") { case (receiverId, receiverInfo) => receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId") } - col("Status") { case (_, receiverInfo) => + t.col("Status") { case (_, receiverInfo) => receiverInfo.map { info => if (info.active) "ACTIVE" else "INACTIVE" }.getOrElse(empty) } - col("Location") { case (_, receiverInfo) => receiverInfo.map(_.location).getOrElse(empty) } - col("Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]") { + t.col("Location") { case (_, receiverInfo) => receiverInfo.map(_.location).getOrElse(empty) } + t.col("Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]") { case (receiverId, _) => formatNumber(lastBatchReceivedRecord(receiverId)) } - col("Minimum rate\n[records/sec]") { + t.col("Minimum rate\n[records/sec]") { case (receiverId, _) => receivedRecordDistributions(receiverId).map { _.getQuantiles(Seq(0.0)).map(formatNumber).head }.getOrElse(empty) } - col("Median rate\n[records/sec]") { + t.col("Median rate\n[records/sec]") { case (receiverId, _) => receivedRecordDistributions(receiverId).map { _.getQuantiles(Seq(0.5)).map(formatNumber).head }.getOrElse(empty) } - col("Maximum rate\n[records/sec]") { + t.col("Maximum rate\n[records/sec]") { case (receiverId, _) => receivedRecordDistributions(receiverId).map { _.getQuantiles(Seq(1.0)).map(formatNumber).head }.getOrElse(empty) } - col("Last Error") { + t.col("Last Error") { case (_, receiverInfo) => receiverInfo.map { info => val msg = s"${info.lastErrorMessage} - ${info.lastError}" if (msg.size > 100) msg.take(97) + "..." else msg }.getOrElse(empty) } - build + t.build() } val dataRows = (0 until listener.numReceivers).map { id => (id, listener.receiverInfo(id)) } From b02c82a7ca6ecd75d65c6c945a1574fecfb22f76 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 Oct 2014 16:17:50 -0700 Subject: [PATCH 10/13] More code style cleanup; - memCol -> sizeCol - 4 spaces for parameter indentation - move `case` statements --- .../deploy/master/ui/ApplicationPage.scala | 2 +- .../spark/deploy/master/ui/MasterPage.scala | 4 +- .../spark/deploy/worker/ui/WorkerPage.scala | 4 +- .../scala/org/apache/spark/ui/UITables.scala | 44 +++++++------------ .../org/apache/spark/ui/storage/RDDPage.scala | 8 ++-- .../apache/spark/ui/storage/StoragePage.scala | 6 +-- .../spark/streaming/ui/StreamingPage.scala | 16 +++---- 7 files changed, 37 insertions(+), 47 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 8d4ccd8e5ee53..3e6ad7cdf9bb6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -54,7 +54,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app {executor.worker.id} } t.intCol("Cores") { _.cores } - t.memCol("Memory") { _.memory } + t.sizeCol("Memory") { _.memory } t.col("State") { _.state.toString } t.customCol("Logs") { executor =>
  • ID: {executor.appId}
  • @@ -70,7 +70,7 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { t.col("Main Class") { _.driverDesc.command.arguments(1) } t.col("State") { _.finalState.getOrElse(DriverState.RUNNING).toString } t.intCol("Cores") { _.driverDesc.cores } - t.memCol("Memory") { _.driverDesc.mem } + t.sizeCol("Memory") { _.driverDesc.mem } t.customCol("Logs") { driver =>
    stdout stderr diff --git a/core/src/main/scala/org/apache/spark/ui/UITables.scala b/core/src/main/scala/org/apache/spark/ui/UITables.scala index 96c0279fa942f..61b3e0706c9c6 100644 --- a/core/src/main/scala/org/apache/spark/ui/UITables.scala +++ b/core/src/main/scala/org/apache/spark/ui/UITables.scala @@ -20,7 +20,7 @@ package org.apache.spark.ui import java.util.Date import scala.collection.mutable -import scala.xml.{Text, Node} +import scala.xml.{Node, Text} import org.apache.spark.util.Utils @@ -142,17 +142,7 @@ private[spark] class UITable[T] (cols: Seq[UITableColumn[T, _]], fixedWidth: Boo * Columns have additional options, such as controlling their sort keys; see the individual * methods' documentation for more details. * - * - Call `build` to construct an immutable object which can be used to render tables. - * * - * To remove some of the boilerplate here, you can statically import the `col` methods; for example: - * - * val myTable: UITable[MyRowDataType] = { - * val builder = new UITableBuilder[MyRowDataType]() - * import builder._ - * col("Name") { _.name } - * [...] - * build - * } + * - Call `build()` to construct an immutable object which can be used to render tables. * * There are many other features, including support for arbitrary markup in custom column types; * see the actual uses in the web UI code for more details. @@ -168,9 +158,9 @@ private[spark] class UITableBuilder[T](fixedWidth: Boolean = false) { * render the contents of the TD tag, not the TD tag itself. */ def customCol[V]( - name: String, - sortable: Boolean = true, - sortKey: Option[T => String] = None)(renderer: T => Seq[Node]): UITableBuilder[T] = { + name: String, + sortable: Boolean = true, + sortKey: Option[T => String] = None)(renderer: T => Seq[Node]): UITableBuilder[T] = { val customColumn = new UITableColumn[T, T](name, null, sortable, sortKey, identity) { override def renderCellContents(row: T) = renderer(row) } @@ -179,32 +169,32 @@ private[spark] class UITableBuilder[T](fixedWidth: Boolean = false) { } def col[V]( - name: String, - formatter: V => String, - sortable: Boolean = true, - sortKey: Option[V => String] = None)(fieldExtractor: T => V): UITableBuilder[T] = { + name: String, + formatter: V => String, + sortable: Boolean = true, + sortKey: Option[V => String] = None)(fieldExtractor: T => V): UITableBuilder[T] = { cols.append(UITableColumn(name, formatter, sortable, sortKey, fieldExtractor)) this } def col( - name: String, - sortable: Boolean = true, - sortKey: Option[String => String] = None)(fieldExtractor: T => String): UITableBuilder[T] = { + name: String, + sortable: Boolean = true, + sortKey: Option[String => String] = None)(fieldExtractor: T => String): UITableBuilder[T] = { col[String](name, {x: String => x}, sortable, sortKey)(fieldExtractor) } def intCol( - name: String, - formatter: Int => String = { x: Int => x.toString }, - sortable: Boolean = true)(fieldExtractor: T => Int): UITableBuilder[T] = { + name: String, + formatter: Int => String = { x: Int => x.toString }, + sortable: Boolean = true)(fieldExtractor: T => Int): UITableBuilder[T] = { col[Int](name, formatter, sortable = sortable)(fieldExtractor) } /** - * Display a column of memory sizes, in megabytes, as human-readable strings, such as "4.0 MB". + * Display a column of sizes, in megabytes, as human-readable strings, such as "4.0 MB". */ - def memCol(name: String)(fieldExtractor: T => Long): UITableBuilder[T] = { + def sizeCol(name: String)(fieldExtractor: T => Long): UITableBuilder[T] = { col[Long]( name, formatter = Utils.megabytesToString, diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 365cec554facd..fb52aac2caf82 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -42,16 +42,16 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { val remaining = Utils.bytesToString(status.memRemaining) Text(s"$used ($remaining Remaining)") } - t.memCol("Disk Usage") { case (rddId, status) => status.diskUsedByRdd(rddId) } + t.sizeCol("Disk Usage") { case (rddId, status) => status.diskUsedByRdd(rddId) } t.build() } - val blockTable: UITable[(BlockId, BlockStatus, Seq[String])] = { + private val blockTable: UITable[(BlockId, BlockStatus, Seq[String])] = { val t = new UITableBuilder[(BlockId, BlockStatus, Seq[String])]() t.col("Block Name") { case (id, block, locations) => id.toString } t.col("Storage Level") { case (id, block, locations) => block.storageLevel.description } - t. memCol("Size in Memory") { case (id, block, locations) => block.memSize } - t.memCol("Size on Disk") { case (id, block, locations) => block.diskSize } + t. sizeCol("Size in Memory") { case (id, block, locations) => block.memSize } + t.sizeCol("Size on Disk") { case (id, block, locations) => block.diskSize } t.customCol("Executors") { case (id, block, locations) => locations.map(l => {l}
    ) } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala index deee4a06fbcc6..b3ddf8a87480a 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala @@ -40,9 +40,9 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { t. col("Fraction Cached") { rdd => "%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions) } - t.memCol("Size in Memory") { _.memSize } - t.memCol("Size in Tachyon") { _.tachyonSize } - t.memCol("Size on Disk") { _.diskSize } + t.sizeCol("Size in Memory") { _.memSize } + t.sizeCol("Size in Tachyon") { _.tachyonSize } + t.sizeCol("Size on Disk") { _.diskSize } t.build() } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index d40334af1dcd0..2fd101088bd29 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -100,23 +100,23 @@ private[ui] class StreamingPage(parent: StreamingTab) t.col("Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]") { case (receiverId, _) => formatNumber(lastBatchReceivedRecord(receiverId)) } - t.col("Minimum rate\n[records/sec]") { - case (receiverId, _) => receivedRecordDistributions(receiverId).map { + t.col("Minimum rate\n[records/sec]") { case (receiverId, _) => + receivedRecordDistributions(receiverId).map { _.getQuantiles(Seq(0.0)).map(formatNumber).head }.getOrElse(empty) } - t.col("Median rate\n[records/sec]") { - case (receiverId, _) => receivedRecordDistributions(receiverId).map { + t.col("Median rate\n[records/sec]") { case (receiverId, _) => + receivedRecordDistributions(receiverId).map { _.getQuantiles(Seq(0.5)).map(formatNumber).head }.getOrElse(empty) } - t.col("Maximum rate\n[records/sec]") { - case (receiverId, _) => receivedRecordDistributions(receiverId).map { + t.col("Maximum rate\n[records/sec]") { case (receiverId, _) => + receivedRecordDistributions(receiverId).map { _.getQuantiles(Seq(1.0)).map(formatNumber).head }.getOrElse(empty) } - t.col("Last Error") { - case (_, receiverInfo) => receiverInfo.map { info => + t.col("Last Error") { case (_, receiverInfo) => + receiverInfo.map { info => val msg = s"${info.lastErrorMessage} - ${info.lastError}" if (msg.size > 100) msg.take(97) + "..." else msg }.getOrElse(empty) From dfa69f8b577faaf579e0fdbf4d64e9dfd6a6c20b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 Oct 2014 16:36:48 -0700 Subject: [PATCH 11/13] Support sortable = false to disable column sorting. --- .../org/apache/spark/deploy/worker/ui/WorkerPage.scala | 6 +++--- core/src/main/scala/org/apache/spark/ui/UITables.scala | 3 ++- .../scala/org/apache/spark/streaming/ui/StreamingPage.scala | 4 +++- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala index a3addc4e5649b..9113341430604 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala @@ -48,14 +48,14 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { t.intCol("Cores") { _.cores } t.col("State") { _.state.toString } t.sizeCol("Memory") { _.memory } - t.customCol("Job Details") { executor => + t.customCol("Job Details", sortable = false) { executor =>
    • ID: {executor.appId}
    • Name: {executor.appDesc.name}
    • User: {executor.appDesc.user}
    } - t.customCol("Logs") { executor => + t.customCol("Logs", sortable = false) { executor => stdout + t.customCol("Logs", sortable = false) { driver => stdout stderr } diff --git a/core/src/main/scala/org/apache/spark/ui/UITables.scala b/core/src/main/scala/org/apache/spark/ui/UITables.scala index 61b3e0706c9c6..6b8969a8377f6 100644 --- a/core/src/main/scala/org/apache/spark/ui/UITables.scala +++ b/core/src/main/scala/org/apache/spark/ui/UITables.scala @@ -47,7 +47,8 @@ private case class UITableColumn[T, V]( def renderCell(row: T): Seq[Node] = { val data = fieldExtractor(row) val cellContents = renderCellContents(data) -
} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 2fd101088bd29..0436786543383 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -96,7 +96,9 @@ private[ui] class StreamingPage(parent: StreamingTab) t.col("Status") { case (_, receiverInfo) => receiverInfo.map { info => if (info.active) "ACTIVE" else "INACTIVE" }.getOrElse(empty) } - t.col("Location") { case (_, receiverInfo) => receiverInfo.map(_.location).getOrElse(empty) } + t.col("Location") { case (_, receiverInfo) => + receiverInfo.map(_.location).getOrElse(empty) + } t.col("Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]") { case (receiverId, _) => formatNumber(lastBatchReceivedRecord(receiverId)) } From 6fa3c90764ea294bd1844768e5fb04e19572c46f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 Oct 2014 18:07:36 -0700 Subject: [PATCH 12/13] Use table builder for stages' accumulator table. --- .../main/scala/org/apache/spark/ui/UIUtils.scala | 8 ++++++++ .../org/apache/spark/ui/env/EnvironmentPage.scala | 13 +++---------- .../scala/org/apache/spark/ui/jobs/StagePage.scala | 8 ++++---- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 32e6b15bb0999..3d897df5c3215 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -234,6 +234,14 @@ private[spark] object UIUtils extends Logging { } + /** Render key-value pairs as a table */ + def stringPairTable(col1Name: String, col2Name: String): UITable[(Any, Any)] = { + val t = new UITableBuilder[(Any, Any)](fixedWidth = true) + t.col(col1Name) { _._1.toString } + t.col(col2Name) { _._2.toString } + t.build() + } + /** Returns an HTML table constructed by generating a row for each object in a sequence. */ def listingTable[T]( headers: Seq[String], diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala index 4f9b35a591340..1f9043918b66b 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala @@ -21,20 +21,13 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{UITableBuilder, UITable, UIUtils, WebUIPage} +import org.apache.spark.ui.{UIUtils, WebUIPage} private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") { private val listener = parent.listener - private def stringPairTable(col1Name: String, col2Name: String): UITable[(Any, Any)] = { - val t = new UITableBuilder[(Any, Any)](fixedWidth = true) - t.col(col1Name) { _._1.toString } - t.col(col2Name) { _._2.toString } - t.build() - } - - private val propertyTable = stringPairTable("Name", "Value") - private val classpathTable = stringPairTable("Resource", "Source") + private val propertyTable = UIUtils.stringPairTable("Name", "Value") + private val classpathTable = UIUtils.stringPairTable("Resource", "Source") def render(request: HttpServletRequest): Seq[Node] = { val content = diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 2414e4c65237e..7f80c8008b75f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -31,6 +31,8 @@ import org.apache.spark.scheduler.AccumulableInfo private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { private val listener = parent.listener + private val accumulableTable = UIUtils.stringPairTable("Accumulable", "Value") + def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val stageId = request.getParameter("id").toInt @@ -96,10 +98,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { // scalastyle:on - val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value") - def accumulableRow(acc: AccumulableInfo) = - val accumulableTable = UIUtils.listingTable(accumulableHeaders, accumulableRow, - accumulables.values.toSeq) + val accumulableTable = + this.accumulableTable.render(accumulables.values.map(a => (a.name, a.value))) val taskHeaders: Seq[String] = Seq( From 1975cd6364d6403ec2d7f885743020b55b10fea9 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 Oct 2014 20:12:28 -0700 Subject: [PATCH 13/13] Use builder pattern for columns, too: This removes the messy "custom col" stuff and allows for a somewhat more "declarative"-looking style when customizing markup rendering, sorting, etc. --- .../spark/deploy/history/HistoryPage.scala | 2 +- .../deploy/master/ui/ApplicationPage.scala | 8 +- .../spark/deploy/master/ui/MasterPage.scala | 23 +- .../spark/deploy/worker/ui/WorkerPage.scala | 18 +- .../scala/org/apache/spark/ui/UITables.scala | 133 +++++------ .../org/apache/spark/ui/jobs/StagePage.scala | 216 ++++++++---------- .../org/apache/spark/ui/storage/RDDPage.scala | 16 +- .../apache/spark/ui/storage/StoragePage.scala | 6 +- .../spark/streaming/ui/StreamingPage.scala | 24 +- 9 files changed, 211 insertions(+), 235 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index adaf6cf12a362..22784c8c2eff6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -29,7 +29,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { val appTable: UITable[ApplicationHistoryInfo] = { val t = new UITableBuilder[ApplicationHistoryInfo]() - t.customCol("App ID") { info => + t.col("App ID") (identity) withMarkup { info => val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" {info.id} } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 3e6ad7cdf9bb6..2e6de43c53b59 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -50,18 +50,18 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app private val executorsTable: UITable[ExecutorInfo] = { val t = new UITableBuilder[ExecutorInfo]() t.col("ExecutorID") { _.id.toString } - t.customCol("Worker") { executor => + t.col("Worker") (identity) withMarkup { executor => {executor.worker.id} } - t.intCol("Cores") { _.cores } + t.col("Cores") { _.cores } t.sizeCol("Memory") { _.memory } t.col("State") { _.state.toString } - t.customCol("Logs") { executor => + t.col("Logs") (identity) withMarkup { executor => stdout stderr - } + } isUnsortable() t.build() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index 40ac20f635e99..a1e1ed0752897 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -43,27 +43,28 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { private val workerTable: UITable[WorkerInfo] = { val t = new UITableBuilder[WorkerInfo]() - t.customCol("ID") { worker => + t.col("ID") (identity) withMarkup { worker => {worker.id} } t.col("Address") { worker => s"${worker.host}:${worker.port}"} t.col("State") { _.state.toString } - t.intCol("Cores", formatter = c => s"$c Used") { _.coresUsed } - t.customCol("Memory", - sortKey = Some({worker: WorkerInfo => s"${worker.memory}:${worker.memoryUsed}"})) { worker => - Text(Utils.megabytesToString(worker.memory)) ++ - Text(Utils.megabytesToString(worker.memoryUsed)) + t.col("Cores") { _.coresUsed } formatWith { c: Int => s"$c Used" } + t.col("Memory") (identity) sortBy { worker => + s"${worker.memory}:${worker.memoryUsed}" + } withMarkup { worker => + Text(Utils.megabytesToString(worker.memory)) ++ + Text(Utils.megabytesToString(worker.memoryUsed)) } t.build() } private val appTable: UITable[ApplicationInfo] = { val t = new UITableBuilder[ApplicationInfo]() - t.customCol("ID") { app => - {app.id} + t.col("ID") (_.id) withMarkup { id => + {id} } t.col("Name") { _.id } - t.intCol("Cores") { _.coresGranted } + t.col("Cores") { _.coresGranted } t.sizeCol("Memory per Node") { _.desc.memoryPerSlave } t.dateCol("Submitted Time") { _.submitDate } t.col("User") { _.desc.user } @@ -76,11 +77,11 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { val t = new UITableBuilder[DriverInfo]() t.col("ID") { _.id } t.dateCol("Submitted Time") { _.submitDate } - t.customCol("Worker") { driver => + t.col("Worker") (identity) withMarkup { driver => driver.worker.map(w => {w.id.toString}).getOrElse(Text("None")) } t.col("State") { _.state.toString } - t.intCol("Cores") { _.desc.cores } + t.col("Cores") { _.desc.cores } t.sizeCol("Memory") { _.desc.mem.toLong } t.col("Main Class") { _.desc.command.arguments(1) } t.build() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala index 9113341430604..b6e0e38bd0415 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala @@ -44,23 +44,23 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { private val executorTable: UITable[ExecutorRunner] = { val t = new UITableBuilder[ExecutorRunner]() - t.intCol("Executor ID") { _.execId } - t.intCol("Cores") { _.cores } + t.col("Executor ID") { _.execId } + t.col("Cores") { _.cores } t.col("State") { _.state.toString } t.sizeCol("Memory") { _.memory } - t.customCol("Job Details", sortable = false) { executor => + t.col("Job Details") (identity) withMarkup { executor =>
  • ID: {executor.appId}
  • Name: {executor.appDesc.name}
  • User: {executor.appDesc.user}
- } - t.customCol("Logs", sortable = false) { executor => + } isUnsortable() + t.col("Logs") (identity) withMarkup { executor => stdout stderr - } + } isUnsortable() t.build() } @@ -69,12 +69,12 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { t.col("Driver ID") { _.driverId } t.col("Main Class") { _.driverDesc.command.arguments(1) } t.col("State") { _.finalState.getOrElse(DriverState.RUNNING).toString } - t.intCol("Cores") { _.driverDesc.cores } + t.col("Cores") { _.driverDesc.cores } t.sizeCol("Memory") { _.driverDesc.mem } - t.customCol("Logs", sortable = false) { driver => + t.col("Logs") (identity) withMarkup { driver => stdout stderr - } + } isUnsortable() t.col("Notes") { _.finalException.getOrElse("").toString } t.build() } diff --git a/core/src/main/scala/org/apache/spark/ui/UITables.scala b/core/src/main/scala/org/apache/spark/ui/UITables.scala index 6b8969a8377f6..7be5d593691f1 100644 --- a/core/src/main/scala/org/apache/spark/ui/UITables.scala +++ b/core/src/main/scala/org/apache/spark/ui/UITables.scala @@ -29,34 +29,63 @@ import org.apache.spark.util.Utils * Describes how to render a column of values in a web UI table. * * @param name the name / title of this column - * @param formatter function that formats values for display in the table - * @param sortable if false, this column will not be sortable - * @param sortKey optional function for sorting by a key other than `formatter(value)` * @param fieldExtractor function for extracting this field's value from the table's row data type * @tparam T the table's row data type * @tparam V this column's value type */ -private case class UITableColumn[T, V]( +case class UITableColumn[T, V]( name: String, - formatter: V => String, - sortable: Boolean, - sortKey: Option[V => String], fieldExtractor: T => V) { + private var sortable: Boolean = true + private var sortKey: Option[V => String] = None + private var formatter: V => String = x => x.toString + private var cellContentsRenderer: V => Seq[Node] = (data: V) => Text(formatter(data)) + + /** + * Optional method for sorting this table by a key other than the cell's text contents. + */ + def sortBy(keyFunc: V => String): UITableColumn[T, V] = { + sortKey = Some(keyFunc) + this + } + + /** + * Override the default cell formatting of the extracted value. By default, values are rendered + * by calling toString(). + */ + def formatWith(formatFunc: V => String): UITableColumn[T, V] = { + formatter = formatFunc + this + } + + /** + * Make this column unsortable. This is useful for columns that display UI elements, such + * as buttons to link to logs + */ + def isUnsortable(): UITableColumn[T, V] = { + sortable = false + this + } + + /** + * Customize the markup used to render this table cell. The markup should only describe how to + * render the contents of the TD tag, not the TD tag itself. This overrides `formatWith`. + */ + def withMarkup(markupFunc: V => Seq[Node]): UITableColumn[T, V] = { + cellContentsRenderer = markupFunc + this + } + /** Render the TD tag for this row */ - def renderCell(row: T): Seq[Node] = { + def _renderCell(row: T): Seq[Node] = { val data = fieldExtractor(row) - val cellContents = renderCellContents(data) + val cellContents = cellContentsRenderer(data) val cls = if (sortable) None else Some(Text("sorttable_nosort"))
} - - /** Render the contents of the TD tag for this row. The contents may be a string or HTML */ - def renderCellContents(data: V): Seq[Node] = { - Text(formatter(data)) - } } /** @@ -94,7 +123,7 @@ private[spark] class UITable[T] (cols: Seq[UITableColumn[T, _]], fixedWidth: Boo } private def renderRow(row: T): Seq[Node] = { - val tds = cols.map(_.renderCell(row)) + val tds = cols.map(_._renderCell(row)) { tds } } @@ -138,7 +167,7 @@ private[spark] class UITable[T] (cols: Seq[UITableColumn[T, _]], fixedWidth: Boo * code like * * builder.col("Id") { _.id } - * builder.memCol("Memory" { _.memory } + * builder.sizeCol("Memory" { _.memory } * * Columns have additional options, such as controlling their sort keys; see the individual * methods' documentation for more details. @@ -155,84 +184,42 @@ private[spark] class UITableBuilder[T](fixedWidth: Boolean = false) { private val cols = mutable.Buffer[UITableColumn[T, _]]() /** - * Display a column with custom HTML markup. The markup should only describe how to - * render the contents of the TD tag, not the TD tag itself. + * General builder method for table columns. By default, this extracts a field + * and displays it as as a string. You can call additional methods on the result + * of this method to customize this column's display. */ - def customCol[V]( - name: String, - sortable: Boolean = true, - sortKey: Option[T => String] = None)(renderer: T => Seq[Node]): UITableBuilder[T] = { - val customColumn = new UITableColumn[T, T](name, null, sortable, sortKey, identity) { - override def renderCellContents(row: T) = renderer(row) - } - cols.append(customColumn) - this - } - - def col[V]( - name: String, - formatter: V => String, - sortable: Boolean = true, - sortKey: Option[V => String] = None)(fieldExtractor: T => V): UITableBuilder[T] = { - cols.append(UITableColumn(name, formatter, sortable, sortKey, fieldExtractor)) - this - } - - def col( - name: String, - sortable: Boolean = true, - sortKey: Option[String => String] = None)(fieldExtractor: T => String): UITableBuilder[T] = { - col[String](name, {x: String => x}, sortable, sortKey)(fieldExtractor) - } - - def intCol( - name: String, - formatter: Int => String = { x: Int => x.toString }, - sortable: Boolean = true)(fieldExtractor: T => Int): UITableBuilder[T] = { - col[Int](name, formatter, sortable = sortable)(fieldExtractor) + def col[V](name: String)(fieldExtractor: T => V): UITableColumn[T, V] = { + val newCol = new UITableColumn[T, V](name, fieldExtractor) + cols.append(newCol) + newCol } /** * Display a column of sizes, in megabytes, as human-readable strings, such as "4.0 MB". */ - def sizeCol(name: String)(fieldExtractor: T => Long): UITableBuilder[T] = { - col[Long]( - name, - formatter = Utils.megabytesToString, - sortKey = Some(x => x.toString))(fieldExtractor) + def sizeCol(name: String)(fieldExtractor: T => Long) { + col[Long](name)(fieldExtractor) sortBy (x => x.toString) formatWith Utils.megabytesToString } /** * Display a column of dates as yyyy/MM/dd HH:mm:ss format. */ - def dateCol(name: String)(fieldExtractor: T => Date): UITableBuilder[T] = { - col[Date](name, formatter = UIUtils.formatDate)(fieldExtractor) + def dateCol(name: String)(fieldExtractor: T => Date) { + col[Date](name)(fieldExtractor) formatWith UIUtils.formatDate } /** * Display a column of dates as yyyy/MM/dd HH:mm:ss format. */ - def epochDateCol(name: String)(fieldExtractor: T => Long): UITableBuilder[T] = { - col[Long](name, formatter = UIUtils.formatDate)(fieldExtractor) + def epochDateCol(name: String)(fieldExtractor: T => Long) { + col[Long](name)(fieldExtractor) formatWith UIUtils.formatDate } /** * Display a column of durations, in milliseconds, as human-readable strings, such as "12 s". */ - def durationCol(name: String)(fieldExtractor: T => Long): UITableBuilder[T] = { - col[Long](name, formatter = UIUtils.formatDuration, sortKey = Some(_.toString))(fieldExtractor) - } - - /** - * Display a column of optional durations, in milliseconds, as human-readable strings, - * such as "12 s". If the duration is None, then '-' will be displayed. - */ - def optDurationCol(name: String)(fieldExtractor: T => Option[Long]): UITableBuilder[T] = { - col[Option[Long]]( - name, - formatter = { _.map(UIUtils.formatDuration).getOrElse("-")}, - sortKey = Some(_.getOrElse("-").toString) - )(fieldExtractor) + def durationCol(name: String)(fieldExtractor: T => Long) { + col[Long](name)(fieldExtractor) sortBy (_.toString) formatWith UIUtils.formatDuration } def build(): UITable[T] = { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 7f80c8008b75f..2255d31fac73d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -20,12 +20,11 @@ package org.apache.spark.ui.jobs import java.util.Date import javax.servlet.http.HttpServletRequest -import scala.xml.{Node, Unparsed} +import scala.xml.{Text, Node, Unparsed} -import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils} +import org.apache.spark.ui._ import org.apache.spark.ui.jobs.UIData._ import org.apache.spark.util.{Utils, Distribution} -import org.apache.spark.scheduler.AccumulableInfo /** Page showing statistics and task list for a given stage */ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { @@ -59,6 +58,101 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val hasShuffleWrite = stageData.shuffleWriteBytes > 0 val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0 + val taskTableRenderer: UITable[TaskUIData] = { + val t = new UITableBuilder[TaskUIData]() + t.col("Index") { _.taskInfo.index } + t.col("Id") { _.taskInfo.taskId } + t.col("Attempt") { _.taskInfo } formatWith { info => + if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString + } sortBy { info => + info.taskId.toString + } + t.col("Status") { _.taskInfo.status } + t.col("Locality level") { _.taskInfo.taskLocality } + t.col("Executor ID / Host") { case TaskUIData(info, _, _) => + s"${info.executorId} / ${info.host}" + } + t.dateCol("Launch Time") { case TaskUIData(info, _, _) => + new Date(info.launchTime) + } + t.col("Duration") { case TaskUIData(info, metrics, _) => + val duration = + if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) + else metrics.map(_.executorRunTime).getOrElse(1L) + (info, metrics, duration) + } formatWith { case (info, metrics, duration) => + if (info.status == "RUNNING") UIUtils.formatDuration(duration) + else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("") + } sortBy { case (info, metrics, duration) => + duration.toString + } + + t.durationCol("GC Time") { _.taskMetrics.map(_.jvmGCTime).getOrElse(0L)} + t.durationCol("Serialization Time") { + _.taskMetrics.map(_.resultSerializationTime).getOrElse(0L) + } + t.col("Accumulators")(identity) withMarkup { case TaskUIData(info, _, _) => + Unparsed( + info.accumulables.map{acc => s"${acc.name}: ${acc.update.get}"}.mkString("
") + ) + } + if (hasInput) { + t.col("Input") { + _.taskMetrics.flatMap(_.inputMetrics) + } sortBy { + _.map(_.bytesRead.toString).getOrElse("") + } formatWith { _.map( + m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase})") + .getOrElse("") + } + } + if (hasShuffleRead) { + t.col("Shuffle Read") { + _.taskMetrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) + } sortBy { + _.map(_.toString).getOrElse("") + } formatWith { + _.map(Utils.bytesToString).getOrElse("") + } + } + if (hasShuffleWrite) { + t.col("Write Time") { + _.taskMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime) + } sortBy { + _.map(_.toString).getOrElse("") + } formatWith { + _.map(t => t / (1000 * 1000)).map { ms => + if (ms == 0) "" else UIUtils.formatDuration(ms) + }.getOrElse("") + } + + t.col("Shuffle Write") { + _.taskMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten) + } sortBy { + _.map(_.toString).getOrElse("") + } formatWith { + _.map(Utils.bytesToString).getOrElse("") + } + } + if (hasBytesSpilled) { + t.col("Shuffle Spill (Memory)") { _.taskMetrics.map(_.memoryBytesSpilled) } sortBy { + _.map(_.toString).getOrElse("") + } formatWith { + _.map(Utils.bytesToString).getOrElse("") + } + + t.col("Shuffle Spill (Disk)") { _.taskMetrics.map(_.diskBytesSpilled) } sortBy { + _.map(_.toString).getOrElse("") + } formatWith { + _.map(Utils.bytesToString).getOrElse("") + } + } + t.col("Errors")(identity) withMarkup { case TaskUIData(_, _, errorMessage) => + errorMessage.map { e =>
{e}
}.getOrElse(Text("")) + } + t.build() + } + // scalastyle:off val summary =
@@ -101,18 +195,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val accumulableTable = this.accumulableTable.render(accumulables.values.map(a => (a.name, a.value))) - val taskHeaders: Seq[String] = - Seq( - "Index", "ID", "Attempt", "Status", "Locality Level", "Executor ID / Host", - "Launch Time", "Duration", "GC Time", "Accumulators") ++ - {if (hasInput) Seq("Input") else Nil} ++ - {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ - {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ - {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++ - Seq("Errors") - - val taskTable = UIUtils.listingTable( - taskHeaders, taskRow(hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) + val taskTable = taskTableRenderer.render(tasks) // Excludes tasks which failed and have incomplete metrics val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined) @@ -230,107 +313,4 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { UIUtils.headerSparkPage("Details for Stage %d".format(stageId), content, parent) } } - - def taskRow( - hasInput: Boolean, - hasShuffleRead: Boolean, - hasShuffleWrite: Boolean, - hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = { - taskData match { case TaskUIData(info, metrics, errorMessage) => - val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) - else metrics.map(_.executorRunTime).getOrElse(1L) - val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration) - else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("") - val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) - val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) - - val maybeInput = metrics.flatMap(_.inputMetrics) - val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("") - val inputReadable = maybeInput - .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})") - .getOrElse("") - - val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) - val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") - val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("") - - val maybeShuffleWrite = - metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten) - val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("") - val shuffleWriteReadable = maybeShuffleWrite.map(Utils.bytesToString).getOrElse("") - - val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime) - val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("") - val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms => - if (ms == 0) "" else UIUtils.formatDuration(ms) - }.getOrElse("") - - val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled) - val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("") - val memoryBytesSpilledReadable = - maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("") - - val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled) - val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("") - val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("") - -
- - - - - - - - - - - - {if (hasInput) { - - }} - {if (hasShuffleRead) { - - }} - {if (hasShuffleWrite) { - - - }} - {if (hasBytesSpilled) { - - - }} - - - } - } } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index fb52aac2caf82..b9d1b7a9d3600 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -35,13 +35,13 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { s"${status.blockManagerId.host}:${status.blockManagerId.port}" } def getMemUsed(x: (Int, StorageStatus)): String = x._2.memUsedByRdd(x._1).toString - t.customCol( - "Memory usage", - sortKey = Some(getMemUsed)) { case (rddId, status) => - val used = Utils.bytesToString(status.memUsedByRdd(rddId)) - val remaining = Utils.bytesToString(status.memRemaining) - Text(s"$used ($remaining Remaining)") - } + t.col("Memory Usage") (identity) sortBy { + getMemUsed + } withMarkup { case (rddId, status) => + val used = Utils.bytesToString(status.memUsedByRdd(rddId)) + val remaining = Utils.bytesToString(status.memRemaining) + Text(s"$used ($remaining Remaining)") + } t.sizeCol("Disk Usage") { case (rddId, status) => status.diskUsedByRdd(rddId) } t.build() } @@ -52,7 +52,7 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { t.col("Storage Level") { case (id, block, locations) => block.storageLevel.description } t. sizeCol("Size in Memory") { case (id, block, locations) => block.memSize } t.sizeCol("Size on Disk") { case (id, block, locations) => block.diskSize } - t.customCol("Executors") { case (id, block, locations) => + t.col("Executors") (identity) withMarkup { case (id, block, locations) => locations.map(l => {l}
) } t.build() diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala index b3ddf8a87480a..a72bc368ab3b6 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala @@ -30,14 +30,14 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { val rddTable: UITable[RDDInfo] = { val t = new UITableBuilder[RDDInfo]() - t.customCol("RDD Name") { rdd => + t.col("RDD Name") (identity) withMarkup { rdd => {rdd.name} } t.col("Storage Level") { _.storageLevel.description } - t.intCol("Cached Partitions") { _.numCachedPartitions } - t. col("Fraction Cached") { rdd => + t.col("Cached Partitions") { _.numCachedPartitions } + t.col("Fraction Cached") { rdd => "%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions) } t.sizeCol("Size in Memory") { _.memSize } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 0436786543383..0aff0d84ed84e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -46,15 +46,23 @@ private[ui] class StreamingPage(parent: StreamingTab) UIUtils.headerSparkPage("Streaming", content, parent, Some(5000)) } - private val batchStatsTable: UITable[(String, Option[Long], Option[Seq[Double]])] = { - val t = new UITableBuilder[(String, Option[Long], Option[Seq[Double]])]() + private type BatchStatsRowType = (String, Option[Long], Option[Seq[Double]]) + private val batchStatsTable: UITable[BatchStatsRowType] = { + val t = new UITableBuilder[BatchStatsRowType]() t.col("Metric") { _._1 } - t.optDurationCol("Last batch") { _._2 } - t.optDurationCol("Minimum") { _._3.map(_(0).toLong) } - t.optDurationCol("25th percentile") { _._3.map(_(1).toLong) } - t.optDurationCol("Median") { _._3.map(_(2).toLong) } - t.optDurationCol("75th percentile") { _._3.map(_(3).toLong) } - t.optDurationCol("Maximum") { _._3.map(_(4).toLong) } + t.col("Last batch") { _._2 } + def optDurationCol(name: String)(fieldExtractor: BatchStatsRowType => Option[Long]) { + t.col(name)(fieldExtractor) sortBy { + _.getOrElse("").toString + } formatWith { + _.map(UIUtils.formatDuration).getOrElse("-") + } + } + t.col("Minimum") { _._3.map(_(0).toLong) } + optDurationCol("25th percentile") { _._3.map(_(1).toLong) } + optDurationCol("Median") { _._3.map(_(2).toLong) } + optDurationCol("75th percentile") { _._3.map(_(3).toLong) } + optDurationCol("Maximum") { _._3.map(_(4).toLong) } t.build() }
- - {rdd.name} - - {rdd.storageLevel.description} - {rdd.numCachedPartitions}{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}{Utils.bytesToString(rdd.memSize)}{Utils.bytesToString(rdd.tachyonSize)}{Utils.bytesToString(rdd.diskSize)}
{status.blockManagerId.host + ":" + status.blockManagerId.port} - {Utils.bytesToString(status.memUsedByRdd(rddId))} - ({Utils.bytesToString(status.memRemaining)} Remaining) - {Utils.bytesToString(status.diskUsedByRdd(rddId))}
{id} - {block.storageLevel.description} - - {Utils.bytesToString(block.memSize)} - - {Utils.bytesToString(block.diskSize)} - - {locations.map(l => {l}
)} -
{kv._1}{kv._2}
{kv._1}{kv._2}
{data._1}{data._2}
{executor.execId}{executor.cores}{executor.state} - {Utils.megabytesToString(executor.memory)} - -
    -
  • ID: {executor.appId}
  • -
  • Name: {executor.appDesc.name}
  • -
  • User: {executor.appDesc.user}
  • -
-
- stdout - stderr -
{driver.driverId}{driver.driverDesc.command.arguments(1)}{driver.finalState.getOrElse(DriverState.RUNNING)} - {driver.driverDesc.cores.toString} - - {Utils.megabytesToString(driver.driverDesc.mem)} - - stdout - stderr - - {driver.finalException.getOrElse("")} -
{d}
Text(k(data)))}> + val cls = if (sortable) None else Some(Text("sorttable_nosort")) + Text(k(data)))} class={cls}> {cellContents}
{acc.name}{acc.value}
Text(k(data)))} class={cls}> {cellContents}
{info.index}{info.taskId}{ - if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString - }{info.status}{info.taskLocality}{info.executorId} / {info.host}{UIUtils.formatDate(new Date(info.launchTime))} - {formatDuration} - - {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""} - - {Unparsed( - info.accumulables.map{acc => s"${acc.name}: ${acc.update.get}"}.mkString("
") - )} -
- {inputReadable} - - {shuffleReadReadable} - - {writeTimeReadable} - - {shuffleWriteReadable} - - {memoryBytesSpilledReadable} - - {diskBytesSpilledReadable} - - {errorMessage.map { e =>
{e}
}.getOrElse("")} -