From a1e72f7d7360646fa45fd6a4ee618c9d21851b47 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Fri, 19 Apr 2019 14:44:34 -0500 Subject: [PATCH 1/4] [SPARK-25719] : Search functionality in datatables in stages page should search over formatted data rather than the raw data The Pull Request to add datatables to stage page SPARK-21809 got merged. The search functionality in those datatables being a great improvement for searching through a large number of tasks, also performs search over the raw data rather than the formatted data displayed in the tables. It would be great if the search can happen for the formatted data as well. Added code to enable searching over displayed data in tables e.g. "165.7 MiB" or "0.3 ms" --- .../spark/status/api/v1/StagesResource.scala | 39 ++++++++++++------- .../scala/org/apache/spark/ui/UIUtils.scala | 10 +++++ 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala index 9d1d66a0e15a4..886fdfd6356b4 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala @@ -24,7 +24,7 @@ import org.apache.spark.SparkException import org.apache.spark.scheduler.StageInfo import org.apache.spark.status.api.v1.StageStatus._ import org.apache.spark.status.api.v1.TaskSorting._ -import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.{SparkUI, UIUtils} import org.apache.spark.ui.jobs.ApiHelper._ @Produces(Array(MediaType.APPLICATION_JSON)) @@ -189,32 +189,41 @@ private[v1] class StagesResource extends BaseAppResource { val taskMetricsContainsValue = (task: TaskData) => task.taskMetrics match { case None => false case Some(metrics) => - (containsValue(task.taskMetrics.get.executorDeserializeTime) - || containsValue(task.taskMetrics.get.executorRunTime) - || containsValue(task.taskMetrics.get.jvmGcTime) - || containsValue(task.taskMetrics.get.resultSerializationTime) - || containsValue(task.taskMetrics.get.memoryBytesSpilled) - || containsValue(task.taskMetrics.get.diskBytesSpilled) - || containsValue(task.taskMetrics.get.peakExecutionMemory) - || containsValue(task.taskMetrics.get.inputMetrics.bytesRead) + (containsValue(UIUtils.formatDuration(task.taskMetrics.get.executorDeserializeTime)) + || containsValue(UIUtils.formatDuration(task.taskMetrics.get.executorRunTime)) + || containsValue(UIUtils.formatDuration(task.taskMetrics.get.jvmGcTime)) + || containsValue(UIUtils.formatDuration(task.taskMetrics.get.resultSerializationTime)) + || containsValue(UIUtils.formatBytes(task.taskMetrics.get.memoryBytesSpilled)) + || containsValue(UIUtils.formatBytes(task.taskMetrics.get.diskBytesSpilled)) + || containsValue(UIUtils.formatBytes(task.taskMetrics.get.peakExecutionMemory)) + || containsValue(UIUtils.formatBytes(task.taskMetrics.get.inputMetrics.bytesRead)) || containsValue(task.taskMetrics.get.inputMetrics.recordsRead) - || containsValue(task.taskMetrics.get.outputMetrics.bytesWritten) + || containsValue(UIUtils.formatBytes(task.taskMetrics.get.outputMetrics.bytesWritten)) || containsValue(task.taskMetrics.get.outputMetrics.recordsWritten) - || containsValue(task.taskMetrics.get.shuffleReadMetrics.fetchWaitTime) + || containsValue(UIUtils.formatDuration( + task.taskMetrics.get.shuffleReadMetrics.fetchWaitTime)) + || containsValue(UIUtils.formatBytes( + task.taskMetrics.get.shuffleReadMetrics.remoteBytesRead)) + || containsValue(UIUtils.formatBytes( + task.taskMetrics.get.shuffleReadMetrics.localBytesRead + + task.taskMetrics.get.shuffleReadMetrics.remoteBytesRead)) || containsValue(task.taskMetrics.get.shuffleReadMetrics.recordsRead) - || containsValue(task.taskMetrics.get.shuffleWriteMetrics.bytesWritten) + || containsValue(UIUtils.formatBytes( + task.taskMetrics.get.shuffleWriteMetrics.bytesWritten)) || containsValue(task.taskMetrics.get.shuffleWriteMetrics.recordsWritten) - || containsValue(task.taskMetrics.get.shuffleWriteMetrics.writeTime)) + || containsValue(UIUtils.formatDuration( + task.taskMetrics.get.shuffleWriteMetrics.writeTime / 1000000))) } val filteredTaskDataSequence: Seq[TaskData] = taskDataList.filter(f => (containsValue(f.taskId) || containsValue(f.index) || containsValue(f.attempt) - || containsValue(f.launchTime) + || containsValue(UIUtils.formatDate(f.launchTime)) || containsValue(f.resultFetchStart.getOrElse(defaultOptionString)) || containsValue(f.executorId) || containsValue(f.host) || containsValue(f.status) || containsValue(f.taskLocality) || containsValue(f.speculative) || containsValue(f.errorMessage.getOrElse(defaultOptionString)) || taskMetricsContainsValue(f) - || containsValue(f.schedulerDelay) || containsValue(f.gettingResultTime))) + || containsValue(UIUtils.formatDuration(f.schedulerDelay)) + || containsValue(UIUtils.formatDuration(f.gettingResultTime)))) filteredTaskDataSequence } diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 11647c0c7c623..63a731b836b30 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -145,6 +145,16 @@ private[spark] object UIUtils extends Logging { } } + def formatBytes(bytes: Long): String = { + if (bytes == 0) { + return "0.0 B" + } + val factor = 1024 + val sizes = Array("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB") + val num = (Math.floor(Math.log(bytes.toDouble) / Math.log(factor.toDouble))).toInt + "%.1f".format((bytes / Math.pow(factor, num)).toFloat) + " " + sizes(num) + } + // Yarn has to go through a proxy so the base uri is provided and has to be on all links def uiRoot(request: HttpServletRequest): String = { // Knox uses X-Forwarded-Context to notify the application the base path From a3a74f72c75716a36bf1adde15e38a4f4b04ab82 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Mon, 22 Apr 2019 14:07:31 -0500 Subject: [PATCH 2/4] [SPARK-25719] : Fixing indentation --- core/src/main/scala/org/apache/spark/ui/UIUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 63a731b836b30..b0019434866d4 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -148,7 +148,7 @@ private[spark] object UIUtils extends Logging { def formatBytes(bytes: Long): String = { if (bytes == 0) { return "0.0 B" - } + } val factor = 1024 val sizes = Array("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB") val num = (Math.floor(Math.log(bytes.toDouble) / Math.log(factor.toDouble))).toInt From 8822c82ff2b362dc64d7ce073be12bed7d8cb59a Mon Sep 17 00:00:00 2001 From: pgandhi Date: Thu, 9 May 2019 15:23:19 -0500 Subject: [PATCH 3/4] [SPARK-25719] : Refactoring method name and adding comment --- .../spark/status/api/v1/StagesResource.scala | 17 +++++++++-------- .../scala/org/apache/spark/ui/UIUtils.scala | 3 ++- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala index 886fdfd6356b4..66dd64ae455bf 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala @@ -193,22 +193,23 @@ private[v1] class StagesResource extends BaseAppResource { || containsValue(UIUtils.formatDuration(task.taskMetrics.get.executorRunTime)) || containsValue(UIUtils.formatDuration(task.taskMetrics.get.jvmGcTime)) || containsValue(UIUtils.formatDuration(task.taskMetrics.get.resultSerializationTime)) - || containsValue(UIUtils.formatBytes(task.taskMetrics.get.memoryBytesSpilled)) - || containsValue(UIUtils.formatBytes(task.taskMetrics.get.diskBytesSpilled)) - || containsValue(UIUtils.formatBytes(task.taskMetrics.get.peakExecutionMemory)) - || containsValue(UIUtils.formatBytes(task.taskMetrics.get.inputMetrics.bytesRead)) + || containsValue(UIUtils.formatBytesBinary(task.taskMetrics.get.memoryBytesSpilled)) + || containsValue(UIUtils.formatBytesBinary(task.taskMetrics.get.diskBytesSpilled)) + || containsValue(UIUtils.formatBytesBinary(task.taskMetrics.get.peakExecutionMemory)) + || containsValue(UIUtils.formatBytesBinary(task.taskMetrics.get.inputMetrics.bytesRead)) || containsValue(task.taskMetrics.get.inputMetrics.recordsRead) - || containsValue(UIUtils.formatBytes(task.taskMetrics.get.outputMetrics.bytesWritten)) + || containsValue(UIUtils.formatBytesBinary( + task.taskMetrics.get.outputMetrics.bytesWritten)) || containsValue(task.taskMetrics.get.outputMetrics.recordsWritten) || containsValue(UIUtils.formatDuration( task.taskMetrics.get.shuffleReadMetrics.fetchWaitTime)) - || containsValue(UIUtils.formatBytes( + || containsValue(UIUtils.formatBytesBinary( task.taskMetrics.get.shuffleReadMetrics.remoteBytesRead)) - || containsValue(UIUtils.formatBytes( + || containsValue(UIUtils.formatBytesBinary( task.taskMetrics.get.shuffleReadMetrics.localBytesRead + task.taskMetrics.get.shuffleReadMetrics.remoteBytesRead)) || containsValue(task.taskMetrics.get.shuffleReadMetrics.recordsRead) - || containsValue(UIUtils.formatBytes( + || containsValue(UIUtils.formatBytesBinary( task.taskMetrics.get.shuffleWriteMetrics.bytesWritten)) || containsValue(task.taskMetrics.get.shuffleWriteMetrics.recordsWritten) || containsValue(UIUtils.formatDuration( diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index b0019434866d4..59c730d9ca863 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -145,7 +145,8 @@ private[spark] object UIUtils extends Logging { } } - def formatBytes(bytes: Long): String = { + // Format raw bytes to binary prefix byte units + def formatBytesBinary(bytes: Long): String = { if (bytes == 0) { return "0.0 B" } From 92089145e739bb070aec53e0c20a48142f7d5118 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Fri, 10 May 2019 09:14:35 -0500 Subject: [PATCH 4/4] [SPARK-25719] : Replace formatBytesBinary with existing method bytesToString --- .../spark/status/api/v1/StagesResource.scala | 17 +++++++++-------- .../scala/org/apache/spark/ui/UIUtils.scala | 11 ----------- 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala index 66dd64ae455bf..db53a400ed62f 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala @@ -26,6 +26,7 @@ import org.apache.spark.status.api.v1.StageStatus._ import org.apache.spark.status.api.v1.TaskSorting._ import org.apache.spark.ui.{SparkUI, UIUtils} import org.apache.spark.ui.jobs.ApiHelper._ +import org.apache.spark.util.Utils @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class StagesResource extends BaseAppResource { @@ -193,23 +194,23 @@ private[v1] class StagesResource extends BaseAppResource { || containsValue(UIUtils.formatDuration(task.taskMetrics.get.executorRunTime)) || containsValue(UIUtils.formatDuration(task.taskMetrics.get.jvmGcTime)) || containsValue(UIUtils.formatDuration(task.taskMetrics.get.resultSerializationTime)) - || containsValue(UIUtils.formatBytesBinary(task.taskMetrics.get.memoryBytesSpilled)) - || containsValue(UIUtils.formatBytesBinary(task.taskMetrics.get.diskBytesSpilled)) - || containsValue(UIUtils.formatBytesBinary(task.taskMetrics.get.peakExecutionMemory)) - || containsValue(UIUtils.formatBytesBinary(task.taskMetrics.get.inputMetrics.bytesRead)) + || containsValue(Utils.bytesToString(task.taskMetrics.get.memoryBytesSpilled)) + || containsValue(Utils.bytesToString(task.taskMetrics.get.diskBytesSpilled)) + || containsValue(Utils.bytesToString(task.taskMetrics.get.peakExecutionMemory)) + || containsValue(Utils.bytesToString(task.taskMetrics.get.inputMetrics.bytesRead)) || containsValue(task.taskMetrics.get.inputMetrics.recordsRead) - || containsValue(UIUtils.formatBytesBinary( + || containsValue(Utils.bytesToString( task.taskMetrics.get.outputMetrics.bytesWritten)) || containsValue(task.taskMetrics.get.outputMetrics.recordsWritten) || containsValue(UIUtils.formatDuration( task.taskMetrics.get.shuffleReadMetrics.fetchWaitTime)) - || containsValue(UIUtils.formatBytesBinary( + || containsValue(Utils.bytesToString( task.taskMetrics.get.shuffleReadMetrics.remoteBytesRead)) - || containsValue(UIUtils.formatBytesBinary( + || containsValue(Utils.bytesToString( task.taskMetrics.get.shuffleReadMetrics.localBytesRead + task.taskMetrics.get.shuffleReadMetrics.remoteBytesRead)) || containsValue(task.taskMetrics.get.shuffleReadMetrics.recordsRead) - || containsValue(UIUtils.formatBytesBinary( + || containsValue(Utils.bytesToString( task.taskMetrics.get.shuffleWriteMetrics.bytesWritten)) || containsValue(task.taskMetrics.get.shuffleWriteMetrics.recordsWritten) || containsValue(UIUtils.formatDuration( diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 59c730d9ca863..11647c0c7c623 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -145,17 +145,6 @@ private[spark] object UIUtils extends Logging { } } - // Format raw bytes to binary prefix byte units - def formatBytesBinary(bytes: Long): String = { - if (bytes == 0) { - return "0.0 B" - } - val factor = 1024 - val sizes = Array("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB") - val num = (Math.floor(Math.log(bytes.toDouble) / Math.log(factor.toDouble))).toInt - "%.1f".format((bytes / Math.pow(factor, num)).toFloat) + " " + sizes(num) - } - // Yarn has to go through a proxy so the base uri is provided and has to be on all links def uiRoot(request: HttpServletRequest): String = { // Knox uses X-Forwarded-Context to notify the application the base path