diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala index 9d1d66a0e15a4..db53a400ed62f 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala @@ -24,8 +24,9 @@ import org.apache.spark.SparkException import org.apache.spark.scheduler.StageInfo import org.apache.spark.status.api.v1.StageStatus._ import org.apache.spark.status.api.v1.TaskSorting._ -import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.{SparkUI, UIUtils} import org.apache.spark.ui.jobs.ApiHelper._ +import org.apache.spark.util.Utils @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class StagesResource extends BaseAppResource { @@ -189,32 +190,42 @@ private[v1] class StagesResource extends BaseAppResource { val taskMetricsContainsValue = (task: TaskData) => task.taskMetrics match { case None => false case Some(metrics) => - (containsValue(task.taskMetrics.get.executorDeserializeTime) - || containsValue(task.taskMetrics.get.executorRunTime) - || containsValue(task.taskMetrics.get.jvmGcTime) - || containsValue(task.taskMetrics.get.resultSerializationTime) - || containsValue(task.taskMetrics.get.memoryBytesSpilled) - || containsValue(task.taskMetrics.get.diskBytesSpilled) - || containsValue(task.taskMetrics.get.peakExecutionMemory) - || containsValue(task.taskMetrics.get.inputMetrics.bytesRead) + (containsValue(UIUtils.formatDuration(task.taskMetrics.get.executorDeserializeTime)) + || containsValue(UIUtils.formatDuration(task.taskMetrics.get.executorRunTime)) + || containsValue(UIUtils.formatDuration(task.taskMetrics.get.jvmGcTime)) + || containsValue(UIUtils.formatDuration(task.taskMetrics.get.resultSerializationTime)) + || containsValue(Utils.bytesToString(task.taskMetrics.get.memoryBytesSpilled)) + || containsValue(Utils.bytesToString(task.taskMetrics.get.diskBytesSpilled)) + || containsValue(Utils.bytesToString(task.taskMetrics.get.peakExecutionMemory)) + || containsValue(Utils.bytesToString(task.taskMetrics.get.inputMetrics.bytesRead)) || containsValue(task.taskMetrics.get.inputMetrics.recordsRead) - || containsValue(task.taskMetrics.get.outputMetrics.bytesWritten) + || containsValue(Utils.bytesToString( + task.taskMetrics.get.outputMetrics.bytesWritten)) || containsValue(task.taskMetrics.get.outputMetrics.recordsWritten) - || containsValue(task.taskMetrics.get.shuffleReadMetrics.fetchWaitTime) + || containsValue(UIUtils.formatDuration( + task.taskMetrics.get.shuffleReadMetrics.fetchWaitTime)) + || containsValue(Utils.bytesToString( + task.taskMetrics.get.shuffleReadMetrics.remoteBytesRead)) + || containsValue(Utils.bytesToString( + task.taskMetrics.get.shuffleReadMetrics.localBytesRead + + task.taskMetrics.get.shuffleReadMetrics.remoteBytesRead)) || containsValue(task.taskMetrics.get.shuffleReadMetrics.recordsRead) - || containsValue(task.taskMetrics.get.shuffleWriteMetrics.bytesWritten) + || containsValue(Utils.bytesToString( + task.taskMetrics.get.shuffleWriteMetrics.bytesWritten)) || containsValue(task.taskMetrics.get.shuffleWriteMetrics.recordsWritten) - || containsValue(task.taskMetrics.get.shuffleWriteMetrics.writeTime)) + || containsValue(UIUtils.formatDuration( + task.taskMetrics.get.shuffleWriteMetrics.writeTime / 1000000))) } val filteredTaskDataSequence: Seq[TaskData] = taskDataList.filter(f => (containsValue(f.taskId) || containsValue(f.index) || containsValue(f.attempt) - || containsValue(f.launchTime) + || containsValue(UIUtils.formatDate(f.launchTime)) || containsValue(f.resultFetchStart.getOrElse(defaultOptionString)) || containsValue(f.executorId) || containsValue(f.host) || containsValue(f.status) || containsValue(f.taskLocality) || containsValue(f.speculative) || containsValue(f.errorMessage.getOrElse(defaultOptionString)) || taskMetricsContainsValue(f) - || containsValue(f.schedulerDelay) || containsValue(f.gettingResultTime))) + || containsValue(UIUtils.formatDuration(f.schedulerDelay)) + || containsValue(UIUtils.formatDuration(f.gettingResultTime)))) filteredTaskDataSequence }