From e2e6fb8b71b3919771bc832e8c9347747c13bb11 Mon Sep 17 00:00:00 2001 From: wuyi Date: Thu, 26 Sep 2019 19:51:26 +0800 Subject: [PATCH 01/21] recover live entities from kvstore --- .../spark/status/AppStatusListener.scala | 96 ++++- .../org/apache/spark/status/LiveEntity.scala | 51 +-- .../org/apache/spark/status/api/v1/api.scala | 26 +- .../org/apache/spark/status/storeTypes.scala | 201 +++++++++- .../apache/spark/storage/StorageLevel.scala | 14 + .../spark/status/AppStatusListenerSuite.scala | 371 ++++++++++++++++++ .../execution/ui/SQLAppStatusListener.scala | 84 +++- .../sql/execution/ui/SQLAppStatusStore.scala | 51 +++ .../ui/SQLAppStatusListenerSuite.scala | 96 +++++ 9 files changed, 940 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index c85b3caf8a5ef..71bce283b26b0 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -69,13 +69,14 @@ private[spark] class AppStatusListener( // Keep track of live entities, so that task metrics can be efficiently updated (without // causing too many writes to the underlying store, and other expensive operations). - private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]() - private val liveJobs = new HashMap[Int, LiveJob]() - private val liveExecutors = new HashMap[String, LiveExecutor]() - private val deadExecutors = new HashMap[String, LiveExecutor]() - private val liveTasks = new HashMap[Long, LiveTask]() - private val liveRDDs = new HashMap[Int, LiveRDD]() - private val pools = new HashMap[String, SchedulerPool]() + // variables are visible for tests. + private[spark] val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]() + private[spark] val liveJobs = new HashMap[Int, LiveJob]() + private[spark] val liveExecutors = new HashMap[String, LiveExecutor]() + private[spark] val deadExecutors = new HashMap[String, LiveExecutor]() + private[spark] val liveTasks = new HashMap[Long, LiveTask]() + private[spark] val liveRDDs = new HashMap[Int, LiveRDD]() + private[spark] val pools = new HashMap[String, SchedulerPool]() private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id" // Keep the active executor count as a separate variable to avoid having to do synchronization @@ -103,6 +104,81 @@ private[spark] class AppStatusListener( } } + // visible for tests + private[spark] def recoverLiveEntities(): Unit = { + if (!live) { + kvstore.view(classOf[JobDataWrapper]) + .asScala.filter(_.info.status == JobExecutionStatus.RUNNING) + .map(_.toLiveJob).foreach(job => liveJobs.put(job.jobId, job)) + + kvstore.view(classOf[StageDataWrapper]).asScala + .filter { stageData => + stageData.info.status == v1.StageStatus.PENDING || + stageData.info.status == v1.StageStatus.ACTIVE + } + .map { stageData => + val stageId = stageData.info.stageId + val jobs = liveJobs.values.filter(_.stageIds.contains(stageId)).toSeq + stageData.toLiveStage(jobs) + }.foreach { stage => + val stageId = stage.info.stageId + val stageAttempt = stage.info.attemptNumber() + liveStages.put((stageId, stageAttempt), stage) + + kvstore.view(classOf[ExecutorStageSummaryWrapper]) + .index("stage") + .first(Array(stageId, stageAttempt)) + .last(Array(stageId, stageAttempt)) + .asScala + .map(_.toLiveExecutorStageSummary) + .foreach { esummary => + stage.executorSummaries.put(esummary.executorId, esummary) + if (esummary.isBlacklisted) { + stage.blackListedExecutors += esummary.executorId + liveExecutors(esummary.executorId).isBlacklisted = true + liveExecutors(esummary.executorId).blacklistedInStages += stageId + } + } + + + kvstore.view(classOf[TaskDataWrapper]) + .parent(Array(stageId, stageAttempt)) + .index(TaskIndexNames.STATUS) + .first(TaskState.RUNNING.toString) + .last(TaskState.RUNNING.toString) + .closeableIterator().asScala + .map(_.toLiveTask) + .foreach { task => + liveTasks.put(task.info.taskId, task) + stage.activeTasksPerExecutor(task.info.executorId) += 1 + } + stage.savedTasks.addAndGet(kvstore.count(classOf[TaskDataWrapper]).intValue()) + } + kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(_.info.isActive) + .map(_.toLiveExecutor).foreach(exec => liveExecutors.put(exec.executorId, exec)) + kvstore.view(classOf[RDDStorageInfoWrapper]).asScala + .foreach { rddWrapper => + val liveRdd = rddWrapper.toLiveRDD(liveExecutors) + liveRDDs.put(liveRdd.info.id, liveRdd) + } + kvstore.view(classOf[PoolData]).asScala.foreach { poolData => + val schedulerPool = poolData.toSchedulerPool + pools.put(schedulerPool.name, schedulerPool) + } + } + } + + // used for tests only + private[spark] def clearLiveEntities(): Unit = { + liveStages.clear() + liveJobs.clear() + liveExecutors.clear() + deadExecutors.clear() + liveTasks.clear() + liveRDDs.clear() + pools.clear() + } + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case SparkListenerLogStart(version) => sparkVersion = version case _ => @@ -877,6 +953,12 @@ private[spark] class AppStatusListener( } } + // used in tests only + private[spark] def flush(): Unit = { + val now = System.nanoTime() + flush(update(_, now)) + } + /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */ private def flush(entityFlushFunc: LiveEntity => Unit): Unit = { liveStages.values.asScala.foreach { stage => diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index aa4a21c1bb818..02534841c1817 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -59,14 +59,14 @@ private[spark] abstract class LiveEntity { } -private class LiveJob( +private[spark] class LiveJob( val jobId: Int, - name: String, + val name: String, val submissionTime: Option[Date], val stageIds: Seq[Int], - jobGroup: Option[String], - numTasks: Int, - sqlExecutionId: Option[Long]) extends LiveEntity { + val jobGroup: Option[String], + val numTasks: Int, + val sqlExecutionId: Option[Long]) extends LiveEntity { var activeTasks = 0 var completedTasks = 0 @@ -74,6 +74,8 @@ private class LiveJob( // Holds both the stage ID and the task index, packed into a single long value. val completedIndices = new OpenHashSet[Long]() + // will only be set when recover LiveJob is needed. + var completedIndicesNum = 0 var killedTasks = 0 var killedSummary: Map[String, Int] = Map() @@ -85,6 +87,8 @@ private class LiveJob( var completionTime: Option[Date] = None var completedStages: Set[Int] = Set() + // will only be set when recover LiveJob is needed. + var completedStagesNum = 0 var activeStages = 0 var failedStages = 0 @@ -104,9 +108,9 @@ private class LiveJob( skippedTasks, failedTasks, killedTasks, - completedIndices.size, + completedIndices.size + completedIndicesNum, activeStages, - completedStages.size, + completedStages.size + completedStagesNum, skippedStages.size, failedStages, killedSummary) @@ -115,7 +119,7 @@ private class LiveJob( } -private class LiveTask( +private[spark] class LiveTask( var info: TaskInfo, stageId: Int, stageAttemptId: Int, @@ -229,7 +233,7 @@ private class LiveTask( } -private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity { +private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity { var hostPort: String = null var host: String = null @@ -272,7 +276,7 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE def hasMemoryInfo: Boolean = totalOnHeap >= 0L // peak values for executor level metrics - val peakExecutorMetrics = new ExecutorMetrics() + var peakExecutorMetrics = new ExecutorMetrics() def hostname: String = if (host != null) host else hostPort.split(":")(0) @@ -316,10 +320,10 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE } } -private class LiveExecutorStageSummary( +private[spark] class LiveExecutorStageSummary( stageId: Int, attemptId: Int, - executorId: String) extends LiveEntity { + val executorId: String) extends LiveEntity { import LiveEntityHelpers._ @@ -353,7 +357,7 @@ private class LiveExecutorStageSummary( } -private class LiveStage extends LiveEntity { +private[spark] class LiveStage extends LiveEntity { import LiveEntityHelpers._ @@ -370,6 +374,8 @@ private class LiveStage extends LiveEntity { var completedTasks = 0 var failedTasks = 0 val completedIndices = new OpenHashSet[Int]() + // will only be set when recover LiveStage is needed. + var completedIndicesNum = 0 var killedTasks = 0 var killedSummary: Map[String, Int] = Map() @@ -405,7 +411,7 @@ private class LiveStage extends LiveEntity { numCompleteTasks = completedTasks, numFailedTasks = failedTasks, numKilledTasks = killedTasks, - numCompletedIndices = completedIndices.size, + numCompletedIndices = completedIndices.size + completedIndicesNum, submissionTime = info.submissionTime.map(new Date(_)), firstTaskLaunchedTime = @@ -458,7 +464,7 @@ private class LiveStage extends LiveEntity { } -private class LiveRDDPartition(val blockName: String) { +private[spark] class LiveRDDPartition(val blockName: String) { import LiveEntityHelpers._ @@ -489,7 +495,7 @@ private class LiveRDDPartition(val blockName: String) { } -private class LiveRDDDistribution(exec: LiveExecutor) { +private[spark] class LiveRDDDistribution(exec: LiveExecutor) { import LiveEntityHelpers._ @@ -506,6 +512,7 @@ private class LiveRDDDistribution(exec: LiveExecutor) { def toApi(): v1.RDDDataDistribution = { if (lastUpdate == null) { lastUpdate = new v1.RDDDataDistribution( + executorId, weakIntern(exec.hostPort), memoryUsed, exec.maxMemory - exec.memoryUsed, @@ -520,7 +527,7 @@ private class LiveRDDDistribution(exec: LiveExecutor) { } -private class LiveRDD(val info: RDDInfo) extends LiveEntity { +private[spark] class LiveRDD(val info: RDDInfo) extends LiveEntity { import LiveEntityHelpers._ @@ -528,10 +535,10 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity { var memoryUsed = 0L var diskUsed = 0L - private val partitions = new HashMap[String, LiveRDDPartition]() - private val partitionSeq = new RDDPartitionSeq() + private[spark] val partitions = new HashMap[String, LiveRDDPartition]() + private[spark] val partitionSeq = new RDDPartitionSeq() - private val distributions = new HashMap[String, LiveRDDDistribution]() + private[spark] val distributions = new HashMap[String, LiveRDDDistribution]() def setStorageLevel(level: String): Unit = { this.storageLevel = weakIntern(level) @@ -589,7 +596,7 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity { } -private class SchedulerPool(name: String) extends LiveEntity { +private[spark] class SchedulerPool(val name: String) extends LiveEntity { var stageIds = Set[Int]() @@ -739,7 +746,7 @@ private object LiveEntityHelpers { * Internally, the sequence is mutable, and elements can modify the data they expose. Additions and * removals are O(1). It is not safe to do multiple writes concurrently. */ -private class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] { +private[spark] class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] { @volatile private var _head: LiveRDDPartition = null @volatile private var _tail: LiveRDDPartition = null diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 5ec9b36393764..8cda304c2322d 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -31,6 +31,7 @@ import org.apache.spark.JobExecutionStatus import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.resource.ResourceInformation +import org.apache.spark.status.{LiveExecutor, LiveRDDDistribution, LiveRDDPartition} case class ApplicationInfo private[spark]( id: String, @@ -181,6 +182,7 @@ class RDDStorageInfo private[spark]( val partitions: Option[Seq[RDDPartitionInfo]]) class RDDDataDistribution private[spark]( + val executorId: String, val address: String, val memoryUsed: Long, val memoryRemaining: Long, @@ -192,14 +194,34 @@ class RDDDataDistribution private[spark]( @JsonDeserialize(contentAs = classOf[JLong]) val onHeapMemoryRemaining: Option[Long], @JsonDeserialize(contentAs = classOf[JLong]) - val offHeapMemoryRemaining: Option[Long]) + val offHeapMemoryRemaining: Option[Long]) { + + def toLiveRDDDistribution(executors: scala.collection.Map[String, LiveExecutor]) + : LiveRDDDistribution = { + val exec = executors.get(executorId).get + val liveRDDDistribution = new LiveRDDDistribution(exec) + liveRDDDistribution.memoryUsed = memoryUsed + liveRDDDistribution.diskUsed = diskUsed + liveRDDDistribution.onHeapUsed = onHeapMemoryUsed.getOrElse(0) + liveRDDDistribution.offHeapUsed = offHeapMemoryUsed.getOrElse(0) + liveRDDDistribution.lastUpdate = this + liveRDDDistribution + } +} class RDDPartitionInfo private[spark]( val blockName: String, val storageLevel: String, val memoryUsed: Long, val diskUsed: Long, - val executors: Seq[String]) + val executors: Seq[String]) { + + def toLiveRDDPartition: LiveRDDPartition = { + val liveRDDPartition = new LiveRDDPartition(blockName) + liveRDDPartition.value = this + liveRDDPartition + } +} class StageData private[spark]( val status: StageStatus, diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 9da5bea8bf5c4..00e2dd64b031d 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -23,8 +23,11 @@ import java.util.Date import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.scheduler.{StageInfo, TaskInfo, TaskLocality} import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1._ +import org.apache.spark.storage.{RDDInfo, StorageLevel} import org.apache.spark.ui.scope._ import org.apache.spark.util.kvstore.KVIndex @@ -59,6 +62,36 @@ private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) { @JsonIgnore @KVIndex("host") val host: String = info.hostPort.split(":")(0) + def toLiveExecutor: LiveExecutor = { + val liveExecutor = new LiveExecutor(info.id, info.addTime.getTime) + liveExecutor.hostPort = info.hostPort + liveExecutor.host = info.hostPort.split(":")(0) + liveExecutor.totalCores = info.totalCores + liveExecutor.rddBlocks = info.rddBlocks + liveExecutor.memoryUsed = info.memoryUsed + liveExecutor.diskUsed = info.diskUsed + liveExecutor.maxTasks = info.maxTasks + liveExecutor.maxMemory = info.maxMemory + liveExecutor.totalTasks = info.totalTasks + liveExecutor.activeTasks = info.activeTasks + liveExecutor.completedTasks = info.completedTasks + liveExecutor.failedTasks = info.failedTasks + liveExecutor.totalDuration = info.totalDuration + liveExecutor.totalGcTime = info.totalGCTime + liveExecutor.totalInputBytes = info.totalInputBytes + liveExecutor.totalShuffleRead = info.totalShuffleRead + liveExecutor.totalShuffleWrite = info.totalShuffleWrite + liveExecutor.isBlacklisted = info.isBlacklisted + liveExecutor.blacklistedInStages = info.blacklistedInStages + liveExecutor.executorLogs = info.executorLogs + liveExecutor.attributes = info.attributes + liveExecutor.totalOnHeap = info.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(-1) + liveExecutor.totalOffHeap = info.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0) + liveExecutor.usedOnHeap = info.memoryMetrics.map(_.usedOnHeapStorageMemory).getOrElse(0) + liveExecutor.usedOffHeap = info.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0) + liveExecutor.peakExecutorMetrics = info.peakMemoryMetrics.getOrElse(new ExecutorMetrics()) + liveExecutor + } } /** @@ -76,6 +109,29 @@ private[spark] class JobDataWrapper( @JsonIgnore @KVIndex("completionTime") private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L) + + def toLiveJob: LiveJob = { + val liveJob = new LiveJob( + info.jobId, + info.name, + info.submissionTime, + info.stageIds, + info.jobGroup, + info.numTasks, + sqlExecutionId) + liveJob.activeTasks = info.numActiveTasks + liveJob.completedTasks = info.numCompletedTasks + liveJob.failedTasks = info.numFailedTasks + liveJob.completedIndicesNum = info.numCompletedIndices + liveJob.killedTasks = info.numKilledTasks + liveJob.killedSummary = info.killedTasksSummary + liveJob.skippedTasks = info.numSkippedTasks + liveJob.skippedStages = skippedStages + liveJob.completedStagesNum = info.numCompletedStages + liveJob.activeStages = info.numActiveStages + liveJob.failedStages = info.numFailedStages + liveJob + } } private[spark] class StageDataWrapper( @@ -95,6 +151,65 @@ private[spark] class StageDataWrapper( @JsonIgnore @KVIndex("completionTime") private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L) + + def toLiveStage(jobs: Seq[LiveJob]): LiveStage = { + val liveStage = new LiveStage + val firstLaunchTime = if (info.firstTaskLaunchedTime.isEmpty) { + Long.MaxValue + } else { + info.firstTaskLaunchedTime.get.getTime + } + val metrics = LiveEntityHelpers.createMetrics( + info.executorDeserializeTime, + info.executorDeserializeCpuTime, + info.executorRunTime, + info.executorCpuTime, + info.resultSize, + info.jvmGcTime, + info.resultSerializationTime, + info.memoryBytesSpilled, + info.diskBytesSpilled, + info.peakExecutionMemory, + info.inputBytes, + info.inputRecords, + info.outputBytes, + info.outputRecords, + info.shuffleRemoteBlocksFetched, + info.shuffleLocalBlocksFetched, + info.shuffleFetchWaitTime, + info.shuffleRemoteBytesRead, + info.shuffleRemoteBytesReadToDisk, + info.shuffleLocalBytesRead, + info.shuffleReadRecords, + info.shuffleWriteBytes, + info.shuffleWriteTime, + info.shuffleWriteRecords + ) + val stageInfo = new StageInfo( + info.stageId, + info.attemptId, + info.name, + info.numTasks, + Nil, // rddInfo + Nil, // parentIds + info.details) + liveStage.jobs = jobs + liveStage.jobIds = jobs.map(_.jobId).toSet + liveStage.info = stageInfo + liveStage.status = info.status + liveStage.description = info.description + liveStage.schedulingPool = info.schedulingPool + liveStage.activeTasks = info.numActiveTasks + liveStage.completedTasks = info.numCompleteTasks + liveStage.failedTasks = info.numFailedTasks + liveStage.completedIndicesNum = info.numCompletedIndices + liveStage.killedTasks = info.numKilledTasks + liveStage.killedSummary = info.killedTasksSummary + liveStage.firstLaunchTime = firstLaunchTime + liveStage.localitySummary = locality + liveStage.metrics = metrics + liveStage + } } /** @@ -290,6 +405,23 @@ private[spark] class TaskDataWrapper( gettingResultTime = 0L) } + def toLiveTask: LiveTask = { + val taskInfo = + new TaskInfo( + taskId, + index, + attempt, + launchTime, + executorId, + host, + TaskLocality.withName(taskLocality), + speculative) + taskInfo.gettingResultTime = gettingResultTime + val lastUpdateTime = duration + launchTime + val liveTask = new LiveTask(taskInfo, stageId, stageAttemptId, Some(lastUpdateTime)) + liveTask + } + @JsonIgnore @KVIndex(TaskIndexNames.STAGE) private def stage: Array[Int] = Array(stageId, stageAttemptId) @@ -360,6 +492,31 @@ private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { @JsonIgnore @KVIndex("cached") def cached: Boolean = info.numCachedPartitions > 0 + def toLiveRDD(executors: scala.collection.Map[String, LiveExecutor]): LiveRDD = { + val rddInfo = new RDDInfo( + info.id, + info.name, + info.numPartitions, + StorageLevel.fromDescription(info.storageLevel), + false, + Nil) + val liveRDD = new LiveRDD(rddInfo) + liveRDD.memoryUsed = info.memoryUsed + liveRDD.diskUsed = info.diskUsed + info.partitions.get.foreach { rddPartition => + val liveRDDPartition = rddPartition.toLiveRDDPartition + liveRDD.partitions.put(rddPartition.blockName, liveRDDPartition) + liveRDD.partitionSeq.addPartition(liveRDDPartition) + } + if (info.dataDistribution.nonEmpty) { + info.dataDistribution.get.foreach { rddDist => + val liveRDDDist = rddDist.toLiveRDDDistribution(executors) + liveRDD.distributions.put(liveRDDDist.executorId, liveRDDDist) + } + } + liveRDD + } + } private[spark] class ExecutorStageSummaryWrapper( @@ -377,6 +534,41 @@ private[spark] class ExecutorStageSummaryWrapper( @JsonIgnore def id: Array[Any] = _id + def toLiveExecutorStageSummary: LiveExecutorStageSummary = { + val liveESSummary = new LiveExecutorStageSummary(stageId, stageAttemptId, executorId) + val metrics = LiveEntityHelpers.createMetrics( + executorDeserializeTime = 0, + executorDeserializeCpuTime = 0, + executorRunTime = 0, + executorCpuTime = 0, + resultSize = 0, + jvmGcTime = 0, + resultSerializationTime = 0, + memoryBytesSpilled = info.memoryBytesSpilled, + diskBytesSpilled = info.diskBytesSpilled, + peakExecutionMemory = 0, + inputBytesRead = info.inputBytes, + inputRecordsRead = info.inputRecords, + outputBytesWritten = info.outputBytes, + outputRecordsWritten = info.outputRecords, + shuffleRemoteBlocksFetched = 0, + shuffleLocalBlocksFetched = 0, + shuffleFetchWaitTime = 0, + shuffleRemoteBytesRead = info.shuffleRead, + shuffleRemoteBytesReadToDisk = 0, + shuffleLocalBytesRead = 0, + shuffleRecordsRead = info.shuffleReadRecords, + shuffleBytesWritten = info.shuffleWrite, + shuffleWriteTime = 0, + shuffleRecordsWritten = info.shuffleWriteRecords) + liveESSummary.taskTime = info.taskTime + liveESSummary.succeededTasks = info.succeededTasks + liveESSummary.failedTasks = info.failedTasks + liveESSummary.isBlacklisted = info.isBlacklistedForStage + liveESSummary.metrics = metrics + liveESSummary + } + } private[spark] class StreamBlockData( @@ -429,7 +621,14 @@ private[spark] class RDDOperationGraphWrapper( private[spark] class PoolData( @KVIndexParam val name: String, - val stageIds: Set[Int]) + val stageIds: Set[Int]) { + + def toSchedulerPool: SchedulerPool = { + val pool = new SchedulerPool(name) + pool.stageIds = stageIds + pool + } +} /** * A class with information about an app, to be used by the UI. There's only one instance of diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 4c6998d7a8e20..2ab3128b0428d 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -163,6 +163,20 @@ object StorageLevel { val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1) + /** + * :: DeveloperApi :: + * Return the StorageLevel object with the specified description. + */ + @DeveloperApi + def fromDescription(desc: String): StorageLevel = { + val (useDisk_, useMemory_, useOffHeap_, deserialized_) = { + (desc.contains("Disk"), desc.contains("Memory"), + desc.contains("off heap"), desc.contains("Deserialized")) + } + val replication_ = desc.split(" ").takeRight(2)(0).dropRight(1).toInt + new StorageLevel(useDisk_, useMemory_, useOffHeap_, deserialized_, replication_) + } + /** * :: DeveloperApi :: * Return the StorageLevel object with the specified name. diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 4b71a4844bde1..f5f43dfddf0d0 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -23,6 +23,7 @@ import java.util.{Date, Properties} import scala.collection.JavaConverters._ import scala.collection.immutable.Map import scala.reflect.{classTag, ClassTag} +import scala.util.Random import org.scalatest.BeforeAndAfter @@ -35,6 +36,7 @@ import org.apache.spark.scheduler.cluster._ import org.apache.spark.status.api.v1 import org.apache.spark.storage._ import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.InMemoryStore class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { @@ -1624,6 +1626,337 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("recover live entities from KVStore") { + def assertListenerEquals(live: AppStatusListener, nonLive: AppStatusListener) + : Unit = { + // ensures all live entities are wrote into KVStore + live.flush() + nonLive.clearLiveEntities() + nonLive.recoverLiveEntities() + assertLiveEntityEquals(live, nonLive) + } + + val kvstore = new ElementTrackingStore(new InMemoryStore, conf) + val liveListener = new AppStatusListener(kvstore, conf, live = true) + val nonLiveListener = new AppStatusListener(kvstore, conf, live = false) + var time = 1L + liveListener.onApplicationStart(SparkListenerApplicationStart( + "test", Some("appId"), time, "spark", Some("appId-attempt"))) + time += 1 + + val exec0 = createExecutorAddedEvent(0) + val exec1 = createExecutorAddedEvent(1) + liveListener.onExecutorAdded(exec0) + liveListener.onExecutorAdded(exec1) + assert(liveListener.liveExecutors.size === 2) + // hostPort is needed in LiveRDDDistribution + liveListener.liveExecutors.get("0").get.hostPort = exec0.executorInfo.executorHost + liveListener.liveExecutors.get("1").get.hostPort = exec1.executorInfo.executorHost + assertListenerEquals(liveListener, nonLiveListener) + + val level = StorageLevel.MEMORY_AND_DISK + val rddInfo0 = new RDDInfo(0, "rdd-0", 1, level, false, Nil) + val rddInfo1 = new RDDInfo(1, "rdd-1", 1, level, false, Nil) + val stage0 = createStageInfo(stageId = 0, attemptId = 0, 2, Seq(rddInfo0)) + val stage1 = createStageInfo(stageId = 1, attemptId = 0, 2, Seq(rddInfo1)) + val jobId = 0 + liveListener.onJobStart(SparkListenerJobStart(jobId, time, Seq(stage0, stage1))) + assert(liveListener.liveJobs.size === 1) + assert(liveListener.liveStages.size === 2) + assertListenerEquals(liveListener, nonLiveListener) + time +=1 + + liveListener.onStageSubmitted(SparkListenerStageSubmitted(stage0)) + liveListener.onStageSubmitted(SparkListenerStageSubmitted(stage1)) + assert(liveListener.liveRDDs.size === 2) + assertListenerEquals(liveListener, nonLiveListener) + time +=1 + + liveListener.onExecutorBlacklisted(SparkListenerExecutorBlacklisted(time, "0", 0)) + assert(liveListener.liveExecutors.get("0").get.isBlacklisted) + assertListenerEquals(liveListener, nonLiveListener) + time += 1 + liveListener.onExecutorUnblacklisted(SparkListenerExecutorUnblacklisted(time, "0")) + assert(!liveListener.liveExecutors.get("0").get.isBlacklisted) + assertListenerEquals(liveListener, nonLiveListener) + time += 1 + + val tasks = createTasks(4, Array("0", "1")) + // update some metrics for stages in order to validate metrics equation below + liveListener.liveStages.values().asScala.foreach { stage => + stage.metrics = createRandomV1TaskMetrics() + } + Seq(stage0, stage1).foreach { stage => + liveListener.onTaskStart(SparkListenerTaskStart( + stage.stageId, stage.attemptNumber(), tasks(stage.stageId * 2))) + liveListener.onTaskStart(SparkListenerTaskStart( + stage.stageId, stage.attemptNumber(), tasks(stage.stageId * 2 + 1))) + } + assert(liveListener.liveTasks.size === 4) + assertListenerEquals(liveListener, nonLiveListener) + time +=1 + + val bm0 = BlockManagerId("0", exec0.executorInfo.executorHost, 1234) + val rdd0 = RddBlock(0, 0, 1L, 2L) + val bm1 = BlockManagerId("1", exec1.executorInfo.executorHost, 4321) + val rdd1 = RddBlock(1, 0, 3L, 4L) + liveListener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm0, rdd0.blockId, level, rdd0.memSize, rdd0.diskSize))) + liveListener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm1, rdd1.blockId, level, rdd1.memSize, rdd1.diskSize))) + assertListenerEquals(liveListener, nonLiveListener) + + liveListener.onUnpersistRDD(SparkListenerUnpersistRDD(rddInfo0.id)) + assert(liveListener.liveRDDs.size === 1) + assertListenerEquals(liveListener, nonLiveListener) + liveListener.onUnpersistRDD(SparkListenerUnpersistRDD(rddInfo1.id)) + assert(liveListener.liveRDDs.size === 0) + assertListenerEquals(liveListener, nonLiveListener) + + val executorMetrics = new ExecutorMetrics(Array(7000L, 70L, 50L, 30L, 60L, + 30L, 100L, 55L, 70L, 20L, 8000L, 4000L, 7000L, 3000L, 6000L, 2000L)) + // finish task 0 and task 2 in stages + val task0 = tasks(stage0.stageId * 2) + val task2 = tasks(stage1.stageId * 2) + task0.finishTime = time + task2.finishTime = time + liveListener.onTaskEnd(SparkListenerTaskEnd(stage0.stageId, stage0.attemptNumber(), "task 0", + Success, task0, executorMetrics, createRandomTaskMetrics())) + liveListener.onTaskEnd(SparkListenerTaskEnd(stage1.stageId, stage1.attemptNumber(), "task 2", + Success, task2, executorMetrics, createRandomTaskMetrics())) + assert(liveListener.liveTasks.size === 2) + assertListenerEquals(liveListener, nonLiveListener) + time += 1 + + // finish task 1 and task 3 in stages + val task1 = tasks(stage0.stageId * 2 + 1) + val task3 = tasks(stage1.stageId * 2 + 1) + task1.finishTime = time + task3.finishTime = time + liveListener.onTaskEnd(SparkListenerTaskEnd(stage0.stageId, stage0.attemptNumber(), "task 1", + Success, task1, executorMetrics, createRandomTaskMetrics())) + liveListener.onTaskEnd(SparkListenerTaskEnd(stage1.stageId, stage1.attemptNumber(), "task 3", + Success, task3, executorMetrics, createRandomTaskMetrics())) + assert(liveListener.liveTasks.size === 0) + assertListenerEquals(liveListener, nonLiveListener) + + liveListener.onStageCompleted(SparkListenerStageCompleted(stage0)) + liveListener.onStageCompleted(SparkListenerStageCompleted(stage1)) + assert(liveListener.liveStages.size === 0) + assertListenerEquals(liveListener, nonLiveListener) + time += 1 + + liveListener.onJobEnd(SparkListenerJobEnd(jobId, time, JobSucceeded)) + assert(liveListener.liveJobs.size === 0) + assertListenerEquals(liveListener, nonLiveListener) + time += 1 + + liveListener.onApplicationEnd(SparkListenerApplicationEnd(time)) + assertListenerEquals(liveListener, nonLiveListener) + } + + private def assertLiveEntityEquals(src: AppStatusListener, dest: AppStatusListener) + : Unit = { + def assertLiveJobEquals(sJob: LiveJob, dJob: LiveJob): Unit = { + assert(sJob.jobId === dJob.jobId) + assert(sJob.name === dJob.name) + assert(sJob.submissionTime === dJob.submissionTime) + assert(sJob.stageIds === dJob.stageIds) + assert(sJob.jobGroup == dJob.jobGroup) + assert(sJob.numTasks == dJob.numTasks) + assert(sJob.sqlExecutionId == dJob.sqlExecutionId) + } + + def assertStageInfoEquals(sSInfo: StageInfo, dSInfo: StageInfo): Unit = { + assert(sSInfo.stageId === dSInfo.stageId) + assert(sSInfo.attemptNumber() === dSInfo.attemptNumber()) + assert(sSInfo.name === dSInfo.name) + assert(sSInfo.numTasks === dSInfo.numTasks) + assert(sSInfo.details === dSInfo.details) + } + + def assertTaskMetricsEquals( + sTM: v1.TaskMetrics, + dTM: v1.TaskMetrics): Unit = { + assert(sTM.executorDeserializeTime === dTM.executorDeserializeTime) + assert(sTM.executorDeserializeCpuTime === dTM.executorDeserializeCpuTime) + assert(sTM.executorRunTime === dTM.executorRunTime) + assert(sTM.executorCpuTime === dTM.executorCpuTime) + assert(sTM.resultSize === dTM.resultSize) + assert(sTM.jvmGcTime === dTM.jvmGcTime) + assert(sTM.resultSerializationTime === dTM.resultSerializationTime) + assert(sTM.memoryBytesSpilled === dTM.memoryBytesSpilled) + assert(sTM.diskBytesSpilled === dTM.diskBytesSpilled) + assert(sTM.peakExecutionMemory === dTM.peakExecutionMemory) + + val sIM = sTM.inputMetrics + val dIM = dTM.inputMetrics + assert(sIM.bytesRead === dIM.bytesRead) + assert(sIM.recordsRead === dIM.recordsRead) + + val sOM = sTM.outputMetrics + val dOM = dTM.outputMetrics + assert(sOM.bytesWritten === dOM.bytesWritten) + assert(sOM.recordsWritten === dOM.recordsWritten) + + val sRM = sTM.shuffleReadMetrics + val dRM = dTM.shuffleReadMetrics + assert(sRM.remoteBlocksFetched === dRM.remoteBlocksFetched) + assert(sRM.localBlocksFetched === dRM.localBlocksFetched) + assert(sRM.fetchWaitTime === dRM.fetchWaitTime) + assert(sRM.remoteBytesRead === dRM.remoteBytesRead) + assert(sRM.remoteBytesReadToDisk === dRM.remoteBytesReadToDisk) + assert(sRM.localBytesRead === dRM.localBytesRead) + assert(sRM.recordsRead === dRM.recordsRead) + + val sWM = sTM.shuffleWriteMetrics + val dWM = sTM.shuffleWriteMetrics + assert(sWM.bytesWritten === dWM.bytesWritten) + assert(sWM.writeTime === dWM.writeTime) + assert(sWM.recordsWritten === dWM.recordsWritten) + } + + def assertTaskInfoEquals(sTInfo: TaskInfo, dTInfo: TaskInfo): Unit = { + assert(sTInfo.taskId === dTInfo.taskId) + assert(sTInfo.index === dTInfo.index) + assert(sTInfo.attemptNumber === dTInfo.attemptNumber) + assert(sTInfo.launchTime === dTInfo.launchTime) + assert(sTInfo.executorId === dTInfo.executorId) + assert(sTInfo.host === dTInfo.host) + assert(sTInfo.taskLocality === dTInfo.taskLocality) + assert(sTInfo.speculative === dTInfo.speculative) + } + val srcExecutors = src.liveExecutors + val destExecutors = dest.liveExecutors + assert(srcExecutors.size === destExecutors.size) + srcExecutors.keys.foreach { execId => + val sExec = srcExecutors.get(execId).get + val dExec = destExecutors.get(execId).get + assert(sExec.addTime === dExec.addTime) + assert(sExec.host === dExec.host) + assert(sExec.hostPort === dExec.hostPort) + assert(sExec.totalCores === dExec.totalCores) + assert(sExec.rddBlocks === dExec.rddBlocks) + assert(sExec.memoryUsed === dExec.memoryUsed) + assert(sExec.diskUsed === dExec.diskUsed) + assert(sExec.maxTasks === dExec.maxTasks) + assert(sExec.maxMemory === dExec.maxMemory) + assert(sExec.totalTasks === dExec.totalTasks) + assert(sExec.activeTasks === dExec.activeTasks) + assert(sExec.completedTasks === dExec.completedTasks) + assert(sExec.failedTasks === dExec.failedTasks) + assert(sExec.totalDuration === dExec.totalDuration) + assert(sExec.totalGcTime === dExec.totalGcTime) + assert(sExec.totalInputBytes === dExec.totalInputBytes) + assert(sExec.totalShuffleRead === dExec.totalShuffleRead) + assert(sExec.totalShuffleWrite === dExec.totalShuffleWrite) + assert(sExec.isBlacklisted === dExec.isBlacklisted) + assert(sExec.blacklistedInStages === dExec.blacklistedInStages) + // return false indicates that there're no updates between these two metrics + assert(!sExec.peakExecutorMetrics.compareAndUpdatePeakValues(dExec.peakExecutorMetrics)) + } + + val srcJobs = src.liveJobs + val destJobs = dest.liveJobs + assert(srcJobs.size === destJobs.size) + srcJobs.keys.foreach { jobId => + val sJob = srcJobs.get(jobId).get + val dJob = destJobs.get(jobId).get + assertLiveJobEquals(sJob, dJob) + } + val srcStages = src.liveStages + val destStages = dest.liveStages + assert(srcStages.size() === destStages.size()) + srcStages.keys().asScala.foreach { stageId => + val sStage = srcStages.get(stageId) + val dStage = destStages.get(stageId) + val sStageJobs = sStage.jobs.sortBy(_.jobId) + val dStageJobs = dStage.jobs.sortBy(_.jobId) + assert(sStageJobs.size === dStageJobs.size) + sStageJobs.zip(dStageJobs).foreach {case (sJob, dJob) => + assertLiveJobEquals(sJob, dJob) } + assert(sStage.jobIds.size === dStage.jobs.size) + assert(sStage.jobIds === dStage.jobIds) + assertStageInfoEquals(sStage.info, dStage.info) + assert(sStage.status === dStage.status) + assert(sStage.description === dStage.description) + assert(sStage.schedulingPool === dStage.schedulingPool) + assert(sStage.activeTasks === dStage.activeTasks) + assert(sStage.completedTasks === dStage.completedTasks) + assert(sStage.completedIndices.size === dStage.completedIndicesNum) + assert(sStage.killedTasks === dStage.killedTasks) + assert(sStage.killedSummary === dStage.killedSummary) + assert(sStage.firstLaunchTime === dStage.firstLaunchTime) + assert(sStage.localitySummary === dStage.localitySummary) + assertTaskMetricsEquals(sStage.metrics, dStage.metrics) + val sSummaries = sStage.executorSummaries + val dSummaries = dStage.executorSummaries + assert(sSummaries.size === dSummaries.size) + sSummaries.keys.foreach { execId => + val sSummary = sSummaries.get(execId).get + val dSummary = dSummaries.get(execId).get + assert(sSummary.executorId === dSummary.executorId) + assert(sSummary.taskTime === dSummary.taskTime) + assert(sSummary.succeededTasks === dSummary.succeededTasks) + assert(sSummary.failedTasks === dSummary.failedTasks) + assert(sSummary.killedTasks === dSummary.killedTasks) + assert(sSummary.isBlacklisted === dSummary.isBlacklisted) + } + // we only compare executors with active tasks to those recovered executors, + // because executors with non active tasks wouldn't be recovered. + assert(sStage.activeTasksPerExecutor.filter(_._2 > 0) === dStage.activeTasksPerExecutor) + assert(sStage.blackListedExecutors === dStage.blackListedExecutors) + } + val srcTasks = src.liveTasks + val destTasks = dest.liveTasks + assert(srcTasks.size === destTasks.size) + srcTasks.keys.foreach { taskId => + val sTask = srcTasks.get(taskId).get + val dTask = destTasks.get(taskId).get + assertTaskInfoEquals(sTask.info, dTask.info) + } + val srcRDDs = src.liveRDDs + val destRDDs = dest.liveRDDs + assert(srcRDDs.size === destRDDs.size) + srcRDDs.keys.foreach { rddId => + val sRDD = srcRDDs.get(rddId).get + val dRDD = destRDDs.get(rddId).get + assert(sRDD.info.id === dRDD.info.id) + assert(sRDD.info.name === dRDD.info.name) + assert(sRDD.info.numPartitions === dRDD.info.numPartitions) + assert(sRDD.info.storageLevel === dRDD.info.storageLevel) + assert(sRDD.memoryUsed === dRDD.memoryUsed) + assert(sRDD.diskUsed === dRDD.diskUsed) + val sRDDPartitions = sRDD.partitions + val dRDDPartitions = dRDD.partitions + assert(sRDDPartitions.size === dRDDPartitions.size) + sRDDPartitions.keys.foreach { block => + val sPartition = sRDDPartitions.get(block).get + val dPartition = dRDDPartitions.get(block).get + assert(sPartition.executors === dPartition.executors) + assert(sPartition.memoryUsed === dPartition.memoryUsed) + assert(sPartition.diskUsed === dPartition.diskUsed) + } + val sRDDDists = sRDD.distributions + val dRDDDists = dRDD.distributions + assert(sRDDDists.size === dRDDDists.size) + sRDDDists.keys.foreach { execId => + val sDist = sRDDDists.get(execId).get + val dDist = dRDDDists.get(execId).get + assert(sDist.executorId === dDist.executorId) + assert(sDist.memoryUsed === dDist.memoryUsed) + assert(sDist.diskUsed === dDist.diskUsed) + } + } + val srcPools = src.pools + val destPools = dest.pools + srcPools.keys.foreach { name => + val sPool = srcPools.get(name).get + val dPool = destPools.get(name).get + assert(sPool.name === dPool.name) + assert(sPool.stageIds === dPool.stageIds) + } + } private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptNumber) @@ -1697,4 +2030,42 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { val executorUpdates = Map((stageId, 0) -> new ExecutorMetrics(executorMetrics)) SparkListenerExecutorMetricsUpdate(executorId.toString, accum, executorUpdates) } + + private def createStageInfo(stageId: Int, attemptId: Int, numTasks: Int, rddInfos: Seq[RDDInfo]) + : StageInfo = { + new StageInfo(stageId = stageId, + attemptId = attemptId, + name = s"stage-$stageId-$attemptId", + numTasks = numTasks, + rddInfos = rddInfos, + parentIds = Nil, + details = s"stage-$stageId-$attemptId") + } + + private def createRandomV1TaskMetrics(): v1.TaskMetrics = { + val rnd = new Random(System.nanoTime()) + LiveEntityHelpers.createMetrics( + rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), + rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), + rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), + rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), + rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong() + ) + } + + private def createRandomTaskMetrics(): TaskMetrics = { + val rnd = new Random(System.nanoTime()) + val taskMetrics = new TaskMetrics() + taskMetrics.setExecutorDeserializeTime(rnd.nextLong()) + taskMetrics.setExecutorDeserializeCpuTime(rnd.nextLong()) + taskMetrics.setExecutorRunTime(rnd.nextLong()) + taskMetrics.setExecutorCpuTime(rnd.nextLong()) + taskMetrics.setResultSize(rnd.nextLong()) + taskMetrics.setJvmGCTime(rnd.nextLong()) + taskMetrics.setResultSerializationTime(rnd.nextLong()) + taskMetrics.incMemoryBytesSpilled(rnd.nextLong()) + taskMetrics.incDiskBytesSpilled(rnd.nextLong()) + taskMetrics.incPeakExecutionMemory(rnd.nextLong()) + taskMetrics + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 2c4a7eacdf10b..1fe9f8e492d79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -42,8 +42,9 @@ class SQLAppStatusListener( // Live tracked data is needed by the SQL status store to calculate metrics for in-flight // executions; that means arbitrary threads may be querying these maps, so they need to be // thread-safe. - private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]() - private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]() + // variables are visible for tests. + private[spark] val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]() + private[spark] val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]() // Returns true if this listener has no live data. Exposed for tests only. private[sql] def noLiveData(): Boolean = { @@ -67,6 +68,27 @@ class SQLAppStatusListener( } } + // visible for tests + private[spark] def recoverLiveEntities(): Unit = { + if (!live) { + kvstore.view(classOf[SQLExecutionUIData]) + // null metricValues potentially indicates a live SQLExecutionUIData + .asScala.filter(_.metricValues == null) + .map(_.toLiveExecutionData) + .foreach(execData => liveExecutions.put(execData.executionId, execData)) + kvstore.view(classOf[SQLStageMetricsWrapper]) + .asScala.map(_.toLiveStageMetrics) + .foreach(metrics => stageMetrics.put(metrics.stageId, metrics)) + } + } + + // used for tests only + private[spark] def clearLiveEntities(): Unit = { + liveExecutions.clear() + stageMetrics.clear() + } + + override def onJobStart(event: SparkListenerJobStart): Unit = { val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) if (executionIdString == null) { @@ -82,17 +104,7 @@ class SQLAppStatusListener( // Should not overwrite the kvstore with new entry, if it already has the SQLExecution // data corresponding to the execId. val sqlStoreData = kvstore.read(classOf[SQLExecutionUIData], executionId) - val executionData = new LiveExecutionData(executionId) - executionData.description = sqlStoreData.description - executionData.details = sqlStoreData.details - executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription - executionData.metrics = sqlStoreData.metrics - executionData.submissionTime = sqlStoreData.submissionTime - executionData.completionTime = sqlStoreData.completionTime - executionData.jobs = sqlStoreData.jobs - executionData.stages = sqlStoreData.stages - executionData.metricsValues = sqlStoreData.metricValues - executionData.endEvents = sqlStoreData.jobs.size + 1 + val executionData = sqlStoreData.toLiveExecutionData liveExecutions.put(executionId, executionData) Some(executionData) } catch { @@ -328,7 +340,18 @@ class SQLAppStatusListener( }.toSet stageMetrics.keySet().asScala .filter(!activeStages.contains(_)) - .foreach(stageMetrics.remove) + .foreach { stageId => + stageMetrics.remove(stageId) + if (live) { + try { + kvstore.delete(classOf[SQLStageMetricsWrapper], stageId) + } catch { + case _: NoSuchElementException => + // Ignore. It's possible that LiveStageMetrics is removed + // before its first time to write to KVStore + } + } + } } private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = { @@ -367,6 +390,16 @@ class SQLAppStatusListener( } } + // visible for tests + private[spark] def flush(): Unit = { + if (live) { + // TODO (wuyi) is this thread safe ? + val now = System.nanoTime() + liveExecutions.values().asScala.foreach(_.write(kvstore, now)) + stageMetrics.values().asScala.foreach(_.write(kvstore, now)) + } + } + private def isSQLStage(stageId: Int): Boolean = { liveExecutions.values().asScala.exists { exec => exec.stages.contains(stageId) @@ -389,7 +422,7 @@ class SQLAppStatusListener( } -private class LiveExecutionData(val executionId: Long) extends LiveEntity { +private[spark] class LiveExecutionData(val executionId: Long) extends LiveEntity { var description: String = null var details: String = null @@ -424,13 +457,28 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { } -private class LiveStageMetrics( +private[spark] class LiveStageMetrics ( val stageId: Int, var attemptId: Int, val accumulatorIds: Set[Long], val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics]) + extends LiveEntity { + + override protected def doUpdate() = { + new SQLStageMetricsWrapper( + stageId, + attemptId, + accumulatorIds, + taskMetrics.asScala.map { case (taskId, metrics) => + (taskId, metrics.toApi) + }.toMap + ) + } +} -private class LiveTaskMetrics( +private[spark] class LiveTaskMetrics( val ids: Array[Long], val values: Array[Long], - val succeeded: Boolean) + val succeeded: Boolean) { + def toApi: SQLTaskMetricsWrapper = new SQLTaskMetricsWrapper(ids, values, succeeded) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 241001a857c8f..03ef9a51dfa0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.ui import java.lang.{Long => JLong} import java.util.Date +import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -99,8 +100,58 @@ class SQLExecutionUIData( @JsonIgnore @KVIndex("completionTime") private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L) + + def toLiveExecutionData: LiveExecutionData = { + val liveExecutionData = new LiveExecutionData(executionId) + liveExecutionData.description = description + liveExecutionData.details = details + liveExecutionData.physicalPlanDescription = physicalPlanDescription + liveExecutionData.metrics = metrics + liveExecutionData.submissionTime = submissionTime + liveExecutionData.completionTime = completionTime + liveExecutionData.jobs = jobs + liveExecutionData.stages = stages + liveExecutionData.metricsValues = metricValues + val endNum = jobs.count { case (_, status) => + status == JobExecutionStatus.SUCCEEDED || status == JobExecutionStatus.FAILED } + liveExecutionData.endEvents = endNum + { if (completionTime.isDefined) 1 else 0 } + liveExecutionData + } } +/** + * Used to recover LiveStageMetrics in SQLAppStatusListener. It would be wrote into KVStore + * continuously and deleted when related LiveStageMetrics is no longer used in SQLAppStatusListener + */ +class SQLStageMetricsWrapper( + @KVIndexParam + val stageId: Int, + val attemptId: Int, + @JsonDeserialize(contentAs = classOf[Long]) + val accumulatorIds: Set[Long], + @JsonDeserialize(keyAs = classOf[Long], contentAs = classOf[LiveTaskMetrics]) + val taskMetrics: Map[Long, SQLTaskMetricsWrapper]) { + def toLiveStageMetrics: LiveStageMetrics = { + val liveTaskMetrics = new ConcurrentHashMap[Long, LiveTaskMetrics]() + val metricsMap = taskMetrics.map { case (taskId, metrics) => + (taskId, new LiveTaskMetrics(metrics.ids, metrics.values, metrics.succeeded)) + } + liveTaskMetrics.putAll(metricsMap.asJava) + new LiveStageMetrics( + stageId, + attemptId, + accumulatorIds, + liveTaskMetrics) + } +} + +class SQLTaskMetricsWrapper( + @JsonDeserialize(contentAs = classOf[Long]) + val ids: Array[Long], + @JsonDeserialize(contentAs = classOf[Long]) + val values: Array[Long], + val succeeded: Boolean) + class SparkPlanGraphWrapper( @KVIndexParam val executionId: Long, val nodes: Seq[SparkPlanGraphNodeWrapper], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 90966d2efec23..872c66a4e4b94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.ui import java.util.Properties +import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import org.json4s.jackson.JsonMethods._ @@ -609,6 +610,101 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils assert(statusStore.executionsCount === 2) assert(statusStore.execution(2) === None) } + + test("recover live entities from KVStore") { + def assertSQLListenerEquals(live: SQLAppStatusListener, nonLive: SQLAppStatusListener) + : Unit = { + // ensures all live entities are wrote into KVStore + live.flush() + nonLive.clearLiveEntities() + nonLive.recoverLiveEntities() + assertSQLLiveEntityEquals(live, nonLive) + } + val conf = sparkContext.conf + kvstore = new ElementTrackingStore(new InMemoryStore, conf) + val liveListener = new SQLAppStatusListener(conf, kvstore, live = true) + val nonLiveListener = new SQLAppStatusListener(conf, kvstore, live = false) + var time = 1 + + liveListener.onOtherEvent(SparkListenerSQLExecutionStart( + 0, "desc", "details", "planDesc", + new SparkPlanInfo("node", "test", Nil, null, Nil), time + )) + assert(liveListener.liveExecutions.size() === 1) + assertSQLListenerEquals(liveListener, nonLiveListener) + time += 1 + + val stage0 = createStageInfo(stageId = 0, attemptId = 0) + val stage1 = createStageInfo(stageId = 1, attemptId = 0) + val properties = createProperties(executionId = 0) + liveListener.onJobStart(SparkListenerJobStart( + 0, time, Seq(stage0, stage1), properties)) + assert(liveListener.stageMetrics.size() === 2) + assertSQLListenerEquals(liveListener, nonLiveListener) + time += 1 + + liveListener.onStageSubmitted(SparkListenerStageSubmitted(stage0)) + liveListener.onStageSubmitted(SparkListenerStageSubmitted(stage1)) + liveListener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate( + "execId-0", Seq((0, 0, 0, createAccumulatorInfos(Map((0, 0), (1, 1))))))) + liveListener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate( + "execId-0", Seq((0, 1, 1, createAccumulatorInfos(Map((0, 0), (1, 1))))))) + assertSQLListenerEquals(liveListener, nonLiveListener) + + liveListener.onJobEnd(SparkListenerJobEnd(0, time, JobSucceeded)) + assertSQLListenerEquals(liveListener, nonLiveListener) + time += 1 + + liveListener.onOtherEvent(SparkListenerSQLExecutionEnd(0, time)) + assert(liveListener.liveExecutions.size() === 0) + assertSQLListenerEquals(liveListener, nonLiveListener) + + // SQLStageMetricsWrapper should be clean up after SQL executions end + assert(kvstore.count(classOf[SQLStageMetricsWrapper]) === 0) + } + + private def assertSQLLiveEntityEquals(src: SQLAppStatusListener, dest: SQLAppStatusListener) + : Unit = { + val srcExecutions = src.liveExecutions + val destExecutions = dest.liveExecutions + assert(srcExecutions.size() === destExecutions.size()) + srcExecutions.keys().asScala.foreach { id => + val srcExec = srcExecutions.get(id) + val destExec = destExecutions.get(id) + assert(srcExec.executionId === destExec.executionId) + assert(srcExec.description === destExec.description) + assert(srcExec.details === destExec.details) + assert(srcExec.physicalPlanDescription === destExec.physicalPlanDescription) + assert(srcExec.metrics === destExec.metrics) + assert(srcExec.submissionTime === destExec.submissionTime) + assert(srcExec.completionTime === destExec.completionTime) + assert(srcExec.jobs === destExec.jobs) + assert(srcExec.stages === destExec.stages) + assert(srcExec.driverAccumUpdates === destExec.driverAccumUpdates) + assert(srcExec.metricsValues === destExec.metricsValues) + assert(srcExec.endEvents === destExec.endEvents) + } + val srcStageMetrics = src.stageMetrics + val destStageMetrics = dest.stageMetrics + assert(srcStageMetrics.size() === destStageMetrics.size()) + srcStageMetrics.keys().asScala.foreach { id => + val srcStageM = srcStageMetrics.get(id) + val destStageM = destStageMetrics.get(id) + assert(srcStageM.stageId === destStageM.stageId) + assert(srcStageM.attemptId === destStageM.attemptId) + assert(srcStageM.accumulatorIds === destStageM.accumulatorIds) + val srcTaskMetrics = srcStageM.taskMetrics + val destTaskMetrics = destStageM.taskMetrics + assert(srcTaskMetrics.size() === destTaskMetrics.size()) + srcTaskMetrics.keys().asScala.foreach { id => + val srcTaskM = srcTaskMetrics.get(id) + val destTaskM = destTaskMetrics.get(id) + assert(srcTaskM.ids === destTaskM.ids) + assert(srcTaskM.values === destTaskM.values) + assert(srcTaskM.succeeded === destTaskM.succeeded) + } + } + } } From bee8325ccc2dd4fd11470560bcad300095dbf394 Mon Sep 17 00:00:00 2001 From: wuyi Date: Thu, 26 Sep 2019 20:31:14 +0800 Subject: [PATCH 02/21] remove personal TODO --- .../org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 1fe9f8e492d79..4e102b0c41b10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -393,7 +393,6 @@ class SQLAppStatusListener( // visible for tests private[spark] def flush(): Unit = { if (live) { - // TODO (wuyi) is this thread safe ? val now = System.nanoTime() liveExecutions.values().asScala.foreach(_.write(kvstore, now)) stageMetrics.values().asScala.foreach(_.write(kvstore, now)) From 51f7d317024b268686e4ed7abd8d306f65ffe9af Mon Sep 17 00:00:00 2001 From: wuyi Date: Wed, 9 Oct 2019 23:08:50 +0800 Subject: [PATCH 03/21] remove unnecessary closeableIterator() for task view --- .../main/scala/org/apache/spark/status/AppStatusListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 71bce283b26b0..603ce84f87645 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -146,7 +146,7 @@ private[spark] class AppStatusListener( .index(TaskIndexNames.STATUS) .first(TaskState.RUNNING.toString) .last(TaskState.RUNNING.toString) - .closeableIterator().asScala + .asScala .map(_.toLiveTask) .foreach { task => liveTasks.put(task.info.taskId, task) From 585dc150dfb3857d77fbba6b79700e903334017c Mon Sep 17 00:00:00 2001 From: wuyi Date: Wed, 9 Oct 2019 23:12:09 +0800 Subject: [PATCH 04/21] more empty lines for easier reading --- .../main/scala/org/apache/spark/status/AppStatusListener.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 603ce84f87645..0a57961ffbed8 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -152,8 +152,10 @@ private[spark] class AppStatusListener( liveTasks.put(task.info.taskId, task) stage.activeTasksPerExecutor(task.info.executorId) += 1 } + stage.savedTasks.addAndGet(kvstore.count(classOf[TaskDataWrapper]).intValue()) } + kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(_.info.isActive) .map(_.toLiveExecutor).foreach(exec => liveExecutors.put(exec.executorId, exec)) kvstore.view(classOf[RDDStorageInfoWrapper]).asScala From 3a23f501919271ff5c7cac8fc05a7a700897e3c3 Mon Sep 17 00:00:00 2001 From: wuyi Date: Wed, 9 Oct 2019 23:18:33 +0800 Subject: [PATCH 05/21] recover ExecutorSummaryWrapper before ExecutorStageSummaryWrapper --- .../org/apache/spark/status/AppStatusListener.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 0a57961ffbed8..c75cd5fe41f3d 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -125,6 +125,9 @@ private[spark] class AppStatusListener( val stageAttempt = stage.info.attemptNumber() liveStages.put((stageId, stageAttempt), stage) + kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(_.info.isActive) + .map(_.toLiveExecutor).foreach(exec => liveExecutors.put(exec.executorId, exec)) + kvstore.view(classOf[ExecutorStageSummaryWrapper]) .index("stage") .first(Array(stageId, stageAttempt)) @@ -135,12 +138,11 @@ private[spark] class AppStatusListener( stage.executorSummaries.put(esummary.executorId, esummary) if (esummary.isBlacklisted) { stage.blackListedExecutors += esummary.executorId - liveExecutors(esummary.executorId).isBlacklisted = true - liveExecutors(esummary.executorId).blacklistedInStages += stageId + liveExecutors.get(esummary.executorId).foreach(_.isBlacklisted = true) + liveExecutors.get(esummary.executorId).foreach(_.blacklistedInStages += stageId) } } - kvstore.view(classOf[TaskDataWrapper]) .parent(Array(stageId, stageAttempt)) .index(TaskIndexNames.STATUS) @@ -156,8 +158,6 @@ private[spark] class AppStatusListener( stage.savedTasks.addAndGet(kvstore.count(classOf[TaskDataWrapper]).intValue()) } - kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(_.info.isActive) - .map(_.toLiveExecutor).foreach(exec => liveExecutors.put(exec.executorId, exec)) kvstore.view(classOf[RDDStorageInfoWrapper]).asScala .foreach { rddWrapper => val liveRdd = rddWrapper.toLiveRDD(liveExecutors) From 521ec7bf151a0de9b6bbb04a210262f41cdd3e65 Mon Sep 17 00:00:00 2001 From: wuyi Date: Wed, 9 Oct 2019 23:58:06 +0800 Subject: [PATCH 06/21] add empty line for convenient reading --- .../main/scala/org/apache/spark/status/AppStatusListener.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index c75cd5fe41f3d..76e76be6cc1be 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -163,6 +163,7 @@ private[spark] class AppStatusListener( val liveRdd = rddWrapper.toLiveRDD(liveExecutors) liveRDDs.put(liveRdd.info.id, liveRdd) } + kvstore.view(classOf[PoolData]).asScala.foreach { poolData => val schedulerPool = poolData.toSchedulerPool pools.put(schedulerPool.name, schedulerPool) From eb7a63a515861bc763a63f12579f0bc84ec05ea4 Mon Sep 17 00:00:00 2001 From: wuyi Date: Thu, 10 Oct 2019 00:10:22 +0800 Subject: [PATCH 07/21] remove redudant empty line --- .../org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 4e102b0c41b10..c564868e259d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -88,7 +88,6 @@ class SQLAppStatusListener( stageMetrics.clear() } - override def onJobStart(event: SparkListenerJobStart): Unit = { val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) if (executionIdString == null) { From 136ace49763610d6b4c7a6acb8a670993c2b1169 Mon Sep 17 00:00:00 2001 From: wuyi Date: Thu, 10 Oct 2019 00:12:53 +0800 Subject: [PATCH 08/21] remove redundant space --- .../apache/spark/sql/execution/ui/SQLAppStatusListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index c564868e259d3..25f976441369f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -455,7 +455,7 @@ private[spark] class LiveExecutionData(val executionId: Long) extends LiveEntity } -private[spark] class LiveStageMetrics ( +private[spark] class LiveStageMetrics( val stageId: Int, var attemptId: Int, val accumulatorIds: Set[Long], From 29972e44a59338ae4b876f5dfccc7e32c2ee50ee Mon Sep 17 00:00:00 2001 From: wuyi Date: Thu, 10 Oct 2019 00:29:54 +0800 Subject: [PATCH 09/21] wrote -> written --- .../org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 03ef9a51dfa0c..2ebb27e8e428e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -120,7 +120,7 @@ class SQLExecutionUIData( } /** - * Used to recover LiveStageMetrics in SQLAppStatusListener. It would be wrote into KVStore + * Used to recover LiveStageMetrics in SQLAppStatusListener. It would be written into KVStore * continuously and deleted when related LiveStageMetrics is no longer used in SQLAppStatusListener */ class SQLStageMetricsWrapper( From 2b8008e81e92907b1885202297274d93977e807c Mon Sep 17 00:00:00 2001 From: wuyi Date: Fri, 11 Oct 2019 22:31:20 +0800 Subject: [PATCH 10/21] format recoverLiveEntities() --- .../spark/status/AppStatusListener.scala | 69 +++++++++---------- 1 file changed, 34 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 76e76be6cc1be..78b5e2d842086 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -115,48 +115,47 @@ private[spark] class AppStatusListener( .filter { stageData => stageData.info.status == v1.StageStatus.PENDING || stageData.info.status == v1.StageStatus.ACTIVE - } - .map { stageData => + }.map { stageData => val stageId = stageData.info.stageId val jobs = liveJobs.values.filter(_.stageIds.contains(stageId)).toSeq stageData.toLiveStage(jobs) }.foreach { stage => - val stageId = stage.info.stageId - val stageAttempt = stage.info.attemptNumber() - liveStages.put((stageId, stageAttempt), stage) - - kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(_.info.isActive) - .map(_.toLiveExecutor).foreach(exec => liveExecutors.put(exec.executorId, exec)) - - kvstore.view(classOf[ExecutorStageSummaryWrapper]) - .index("stage") - .first(Array(stageId, stageAttempt)) - .last(Array(stageId, stageAttempt)) - .asScala - .map(_.toLiveExecutorStageSummary) - .foreach { esummary => - stage.executorSummaries.put(esummary.executorId, esummary) - if (esummary.isBlacklisted) { - stage.blackListedExecutors += esummary.executorId - liveExecutors.get(esummary.executorId).foreach(_.isBlacklisted = true) - liveExecutors.get(esummary.executorId).foreach(_.blacklistedInStages += stageId) + val stageId = stage.info.stageId + val stageAttempt = stage.info.attemptNumber() + liveStages.put((stageId, stageAttempt), stage) + + kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(_.info.isActive) + .map(_.toLiveExecutor).foreach(exec => liveExecutors.put(exec.executorId, exec)) + + kvstore.view(classOf[ExecutorStageSummaryWrapper]) + .index("stage") + .first(Array(stageId, stageAttempt)) + .last(Array(stageId, stageAttempt)) + .asScala + .map(_.toLiveExecutorStageSummary) + .foreach { esummary => + stage.executorSummaries.put(esummary.executorId, esummary) + if (esummary.isBlacklisted) { + stage.blackListedExecutors += esummary.executorId + liveExecutors.get(esummary.executorId).foreach(_.isBlacklisted = true) + liveExecutors.get(esummary.executorId).foreach(_.blacklistedInStages += stageId) + } } - } - kvstore.view(classOf[TaskDataWrapper]) - .parent(Array(stageId, stageAttempt)) - .index(TaskIndexNames.STATUS) - .first(TaskState.RUNNING.toString) - .last(TaskState.RUNNING.toString) - .asScala - .map(_.toLiveTask) - .foreach { task => - liveTasks.put(task.info.taskId, task) - stage.activeTasksPerExecutor(task.info.executorId) += 1 - } + kvstore.view(classOf[TaskDataWrapper]) + .parent(Array(stageId, stageAttempt)) + .index(TaskIndexNames.STATUS) + .first(TaskState.RUNNING.toString) + .last(TaskState.RUNNING.toString) + .asScala + .map(_.toLiveTask) + .foreach { task => + liveTasks.put(task.info.taskId, task) + stage.activeTasksPerExecutor(task.info.executorId) += 1 + } - stage.savedTasks.addAndGet(kvstore.count(classOf[TaskDataWrapper]).intValue()) - } + stage.savedTasks.addAndGet(kvstore.count(classOf[TaskDataWrapper]).intValue()) + } kvstore.view(classOf[RDDStorageInfoWrapper]).asScala .foreach { rddWrapper => From 8344d3ae3aedbe7ba5cd06965d933053a960a5af Mon Sep 17 00:00:00 2001 From: wuyi Date: Fri, 11 Oct 2019 22:39:34 +0800 Subject: [PATCH 11/21] rename xxxNum to numXXX --- .../scala/org/apache/spark/status/LiveEntity.scala | 12 ++++++------ .../scala/org/apache/spark/status/storeTypes.scala | 6 +++--- .../apache/spark/status/AppStatusListenerSuite.scala | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 02534841c1817..12928ce1d368e 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -75,7 +75,7 @@ private[spark] class LiveJob( // Holds both the stage ID and the task index, packed into a single long value. val completedIndices = new OpenHashSet[Long]() // will only be set when recover LiveJob is needed. - var completedIndicesNum = 0 + var numCompletedIndices = 0 var killedTasks = 0 var killedSummary: Map[String, Int] = Map() @@ -88,7 +88,7 @@ private[spark] class LiveJob( var completedStages: Set[Int] = Set() // will only be set when recover LiveJob is needed. - var completedStagesNum = 0 + var numCompletedStages = 0 var activeStages = 0 var failedStages = 0 @@ -108,9 +108,9 @@ private[spark] class LiveJob( skippedTasks, failedTasks, killedTasks, - completedIndices.size + completedIndicesNum, + completedIndices.size + numCompletedIndices, activeStages, - completedStages.size + completedStagesNum, + completedStages.size + numCompletedStages, skippedStages.size, failedStages, killedSummary) @@ -375,7 +375,7 @@ private[spark] class LiveStage extends LiveEntity { var failedTasks = 0 val completedIndices = new OpenHashSet[Int]() // will only be set when recover LiveStage is needed. - var completedIndicesNum = 0 + var numCompletedIndices = 0 var killedTasks = 0 var killedSummary: Map[String, Int] = Map() @@ -411,7 +411,7 @@ private[spark] class LiveStage extends LiveEntity { numCompleteTasks = completedTasks, numFailedTasks = failedTasks, numKilledTasks = killedTasks, - numCompletedIndices = completedIndices.size + completedIndicesNum, + numCompletedIndices = completedIndices.size + numCompletedIndices, submissionTime = info.submissionTime.map(new Date(_)), firstTaskLaunchedTime = diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 00e2dd64b031d..951c83e886573 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -122,12 +122,12 @@ private[spark] class JobDataWrapper( liveJob.activeTasks = info.numActiveTasks liveJob.completedTasks = info.numCompletedTasks liveJob.failedTasks = info.numFailedTasks - liveJob.completedIndicesNum = info.numCompletedIndices + liveJob.numCompletedIndices = info.numCompletedIndices liveJob.killedTasks = info.numKilledTasks liveJob.killedSummary = info.killedTasksSummary liveJob.skippedTasks = info.numSkippedTasks liveJob.skippedStages = skippedStages - liveJob.completedStagesNum = info.numCompletedStages + liveJob.numCompletedStages = info.numCompletedStages liveJob.activeStages = info.numActiveStages liveJob.failedStages = info.numFailedStages liveJob @@ -202,7 +202,7 @@ private[spark] class StageDataWrapper( liveStage.activeTasks = info.numActiveTasks liveStage.completedTasks = info.numCompleteTasks liveStage.failedTasks = info.numFailedTasks - liveStage.completedIndicesNum = info.numCompletedIndices + liveStage.numCompletedIndices = info.numCompletedIndices liveStage.killedTasks = info.numKilledTasks liveStage.killedSummary = info.killedTasksSummary liveStage.firstLaunchTime = firstLaunchTime diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index f5f43dfddf0d0..98db0b31b31fb 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1883,7 +1883,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(sStage.schedulingPool === dStage.schedulingPool) assert(sStage.activeTasks === dStage.activeTasks) assert(sStage.completedTasks === dStage.completedTasks) - assert(sStage.completedIndices.size === dStage.completedIndicesNum) + assert(sStage.completedIndices.size === dStage.numCompletedIndices) assert(sStage.killedTasks === dStage.killedTasks) assert(sStage.killedSummary === dStage.killedSummary) assert(sStage.firstLaunchTime === dStage.firstLaunchTime) From cbd9a8c72dfd9a6aa20cc868689bc670eaaa0a37 Mon Sep 17 00:00:00 2001 From: wuyi Date: Fri, 11 Oct 2019 23:22:26 +0800 Subject: [PATCH 12/21] add comment for toLiveStage --- core/src/main/scala/org/apache/spark/status/storeTypes.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 951c83e886573..69c966adf8cf6 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -193,6 +193,10 @@ private[spark] class StageDataWrapper( Nil, // rddInfo Nil, // parentIds info.details) + + // Note that attributes for `executorSummaries`, `activeTasksPerExecutor`, + // `blackListedExecutors`, `savedTasks` are computed later in + // AppStatusListener.recoverLiveEntities(). liveStage.jobs = jobs liveStage.jobIds = jobs.map(_.jobId).toSet liveStage.info = stageInfo From d0be7968f294e35a00384396567db07c37bd0e60 Mon Sep 17 00:00:00 2001 From: wuyi Date: Sun, 13 Oct 2019 21:02:15 +0800 Subject: [PATCH 13/21] add StorageLevelSuite for fromDescription() --- .../spark/storage/StorageLevelSuite.scala | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/storage/StorageLevelSuite.scala diff --git a/core/src/test/scala/org/apache/spark/storage/StorageLevelSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageLevelSuite.scala new file mode 100644 index 0000000000000..bdd931cbf3bb3 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/StorageLevelSuite.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.apache.spark.SparkFunSuite + + +class StorageLevelSuite extends SparkFunSuite{ + + + private def testFromDescription(oldLevel: StorageLevel): Unit = { + val desc = oldLevel.description + val newLevel = StorageLevel.fromDescription(desc) + assert(oldLevel === newLevel) + } + + test("from description") { + testFromDescription(StorageLevel.NONE) + testFromDescription(StorageLevel.DISK_ONLY) + testFromDescription(StorageLevel.DISK_ONLY_2) + testFromDescription(StorageLevel.MEMORY_ONLY) + testFromDescription(StorageLevel.MEMORY_ONLY_2) + testFromDescription(StorageLevel.MEMORY_ONLY_SER) + testFromDescription(StorageLevel.MEMORY_ONLY_SER_2) + testFromDescription(StorageLevel.MEMORY_AND_DISK) + testFromDescription(StorageLevel.MEMORY_AND_DISK_2) + testFromDescription(StorageLevel.MEMORY_AND_DISK_SER) + testFromDescription(StorageLevel.MEMORY_AND_DISK_SER_2) + testFromDescription(StorageLevel.OFF_HEAP) + } +} From b56e3aa91035076edfd13de9fd12f5d90755d328 Mon Sep 17 00:00:00 2001 From: wuyi Date: Sun, 13 Oct 2019 21:13:58 +0800 Subject: [PATCH 14/21] improve running check condition for SQLExecutionUIData --- .../spark/sql/execution/ui/SQLAppStatusListener.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 25f976441369f..8c1171f71d7fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -70,10 +70,12 @@ class SQLAppStatusListener( // visible for tests private[spark] def recoverLiveEntities(): Unit = { + def isRunningExecution(execUI: SQLExecutionUIData): Boolean = { + execUI.completionTime.isEmpty || execUI.jobs.exists(_._2 == JobExecutionStatus.RUNNING) + } if (!live) { kvstore.view(classOf[SQLExecutionUIData]) - // null metricValues potentially indicates a live SQLExecutionUIData - .asScala.filter(_.metricValues == null) + .asScala.filter(isRunningExecution) .map(_.toLiveExecutionData) .foreach(execData => liveExecutions.put(execData.executionId, execData)) kvstore.view(classOf[SQLStageMetricsWrapper]) From fbce6ed93191d93136ecb1bcdbbac96afd270344 Mon Sep 17 00:00:00 2001 From: wuyi Date: Sun, 13 Oct 2019 21:32:38 +0800 Subject: [PATCH 15/21] recover RDDInfos for LiveStage --- .../scala/org/apache/spark/status/storeTypes.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 69c966adf8cf6..5694efb3f1d8c 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -152,6 +152,14 @@ private[spark] class StageDataWrapper( @JsonIgnore @KVIndex("completionTime") private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L) + private def idAwareRDDInfos(rddIds: Seq[Int]): Seq[RDDInfo] = { + // It's safe to give arbitrary values except id while recovering RDDInfo, + // since a running LiveStage only concerns about rddInfo's id. + rddIds.map { id => + new RDDInfo(id, id.toString, 0, null, false, Nil) + } + } + def toLiveStage(jobs: Seq[LiveJob]): LiveStage = { val liveStage = new LiveStage val firstLaunchTime = if (info.firstTaskLaunchedTime.isEmpty) { @@ -190,7 +198,7 @@ private[spark] class StageDataWrapper( info.attemptId, info.name, info.numTasks, - Nil, // rddInfo + idAwareRDDInfos(info.rddIds), Nil, // parentIds info.details) From 90601de4f42e2a97388794488f6e70c45d0312f8 Mon Sep 17 00:00:00 2001 From: wuyi Date: Sun, 13 Oct 2019 21:47:09 +0800 Subject: [PATCH 16/21] comment OK for recovering StageInfo --- core/src/main/scala/org/apache/spark/status/storeTypes.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 5694efb3f1d8c..2dc3eff972f60 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -193,6 +193,10 @@ private[spark] class StageDataWrapper( info.shuffleWriteTime, info.shuffleWriteRecords ) + + // parentIds, taskMetrics, taskLocalityPreferences and shuffleDepId aren't assigned here + // but it's also OK since a running LiveStage don't visit these attributes. And we'll + // get a complete StageInfo again when we receive SparkListenerStageCompleted event. val stageInfo = new StageInfo( info.stageId, info.attemptId, From a7d352d1cf766a23415c33e2ce47c110ff5759a4 Mon Sep 17 00:00:00 2001 From: wuyi Date: Sun, 13 Oct 2019 21:49:26 +0800 Subject: [PATCH 17/21] recover submissionTime for stageInfo --- core/src/main/scala/org/apache/spark/status/storeTypes.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 2dc3eff972f60..90c5c45912e1f 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -205,6 +205,7 @@ private[spark] class StageDataWrapper( idAwareRDDInfos(info.rddIds), Nil, // parentIds info.details) + stageInfo.submissionTime = info.submissionTime.map(_.getTime) // Note that attributes for `executorSummaries`, `activeTasksPerExecutor`, // `blackListedExecutors`, `savedTasks` are computed later in From 3512303e0cd0f10c63ee7117528b649d13fcc673 Mon Sep 17 00:00:00 2001 From: wuyi Date: Mon, 14 Oct 2019 22:17:31 +0800 Subject: [PATCH 18/21] recover live executors at right place --- .../scala/org/apache/spark/status/AppStatusListener.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 9f68f17f0ce35..abd3243e51590 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -111,6 +111,9 @@ private[spark] class AppStatusListener( .asScala.filter(_.info.status == JobExecutionStatus.RUNNING) .map(_.toLiveJob).foreach(job => liveJobs.put(job.jobId, job)) + kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(_.info.isActive) + .map(_.toLiveExecutor).foreach(exec => liveExecutors.put(exec.executorId, exec)) + kvstore.view(classOf[StageDataWrapper]).asScala .filter { stageData => stageData.info.status == v1.StageStatus.PENDING || @@ -124,9 +127,6 @@ private[spark] class AppStatusListener( val stageAttempt = stage.info.attemptNumber() liveStages.put((stageId, stageAttempt), stage) - kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(_.info.isActive) - .map(_.toLiveExecutor).foreach(exec => liveExecutors.put(exec.executorId, exec)) - kvstore.view(classOf[ExecutorStageSummaryWrapper]) .index("stage") .first(Array(stageId, stageAttempt)) From 7caf9d1d77942010bfe02aa71622ec5baef420e7 Mon Sep 17 00:00:00 2001 From: wuyi Date: Mon, 14 Oct 2019 22:25:06 +0800 Subject: [PATCH 19/21] recover dead executor --- .../main/scala/org/apache/spark/status/AppStatusListener.scala | 3 +++ core/src/main/scala/org/apache/spark/status/storeTypes.scala | 1 + 2 files changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index abd3243e51590..6411d687e1159 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -114,6 +114,9 @@ private[spark] class AppStatusListener( kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(_.info.isActive) .map(_.toLiveExecutor).foreach(exec => liveExecutors.put(exec.executorId, exec)) + kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(!_.info.isActive) + .map(_.toLiveExecutor).foreach(exec => deadExecutors.put(exec.executorId, exec)) + kvstore.view(classOf[StageDataWrapper]).asScala .filter { stageData => stageData.info.status == v1.StageStatus.PENDING || diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 937e16ba36921..490c215702e59 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -66,6 +66,7 @@ private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) { val liveExecutor = new LiveExecutor(info.id, info.addTime.getTime) liveExecutor.hostPort = info.hostPort liveExecutor.host = info.hostPort.split(":")(0) + liveExecutor.isActive = info.isActive liveExecutor.totalCores = info.totalCores liveExecutor.rddBlocks = info.rddBlocks liveExecutor.memoryUsed = info.memoryUsed From c3ca273779390c68adec49d787071ca8ddc23b89 Mon Sep 17 00:00:00 2001 From: wuyi Date: Mon, 14 Oct 2019 22:43:13 +0800 Subject: [PATCH 20/21] enhance condition for live stage --- .../main/scala/org/apache/spark/status/AppStatusListener.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 6411d687e1159..860ba34f8b977 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -120,7 +120,8 @@ private[spark] class AppStatusListener( kvstore.view(classOf[StageDataWrapper]).asScala .filter { stageData => stageData.info.status == v1.StageStatus.PENDING || - stageData.info.status == v1.StageStatus.ACTIVE + stageData.info.status == v1.StageStatus.ACTIVE || + (stageData.info.numActiveTasks > 0 && stageData.info.status != v1.StageStatus.SKIPPED) }.map { stageData => val stageId = stageData.info.stageId val jobs = liveJobs.values.filter(_.stageIds.contains(stageId)).toSeq From 46715b3d2c52acd97fdca2e2034393f7e49e1b4f Mon Sep 17 00:00:00 2001 From: wuyi Date: Tue, 15 Oct 2019 23:24:20 +0800 Subject: [PATCH 21/21] add private[spark] to toLiveRDDDistribution() --- core/src/main/scala/org/apache/spark/status/api/v1/api.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 7e452fdd40aff..a69594f6d6b2f 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -197,7 +197,7 @@ class RDDDataDistribution private[spark]( @JsonDeserialize(contentAs = classOf[JLong]) val offHeapMemoryRemaining: Option[Long]) { - def toLiveRDDDistribution(executors: scala.collection.Map[String, LiveExecutor]) + private[spark] def toLiveRDDDistribution(executors: scala.collection.Map[String, LiveExecutor]) : LiveRDDDistribution = { val exec = executors.get(executorId).get val liveRDDDistribution = new LiveRDDDistribution(exec)