From 704c6ca3c3db7b02c38edbe717df46fc63b1d3e4 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 3 Jan 2020 12:59:03 +0900 Subject: [PATCH 1/7] [SPARK-29779][CORE] Compact old event log files and cleanup - part 1 --- ...he.spark.deploy.history.EventFilterBuilder | 1 + .../history/BasicEventFilterBuilder.scala | 176 +++++++++++++ .../spark/deploy/history/EventFilter.scala | 109 ++++++++ .../history/EventLogFileCompactor.scala | 212 ++++++++++++++++ .../deploy/history/EventLogFileReaders.scala | 28 ++- .../deploy/history/EventLogFileWriters.scala | 28 ++- .../spark/internal/config/package.scala | 18 ++ .../BasicEventFilterBuilderSuite.scala | 236 ++++++++++++++++++ .../history/BasicEventFilterSuite.scala | 197 +++++++++++++++ .../history/EventLogFileCompactorSuite.scala | 233 +++++++++++++++++ .../history/EventLogFileReadersSuite.scala | 6 +- .../history/EventLogFileWritersSuite.scala | 4 +- .../deploy/history/EventLogTestHelper.scala | 78 +++++- .../FilteredEventLogFileRewriterSuite.scala | 90 +++++++ .../spark/status/AppStatusListenerSuite.scala | 38 +-- .../status/ListenerEventsTestHelper.scala | 201 +++++++++++++++ 16 files changed, 1597 insertions(+), 58 deletions(-) create mode 100644 core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala diff --git a/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder b/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder new file mode 100644 index 0000000000000..784e58270ab42 --- /dev/null +++ b/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder @@ -0,0 +1 @@ +org.apache.spark.deploy.history.BasicEventFilterBuilder \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala new file mode 100644 index 0000000000000..fa51c1b0bc8b6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.collection.mutable + +import org.apache.spark.deploy.history.EventFilter.FilterStatistics +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ + +/** + * This class tracks both live jobs and live executors, and pass the list to the + * [[BasicEventFilter]] to help BasicEventFilter to reject finished jobs (+ stages/tasks/RDDs) + * and dead executors. + */ +private[spark] class BasicEventFilterBuilder extends SparkListener with EventFilterBuilder { + private val _liveJobToStages = new mutable.HashMap[Int, Seq[Int]] + private val _stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]] + private val _stageToRDDs = new mutable.HashMap[Int, Seq[Int]] + private val _liveExecutors = new mutable.HashSet[String] + + private var totalJobs: Long = 0L + private var totalStages: Long = 0L + private var totalTasks: Long = 0L + + def liveJobToStages: Map[Int, Seq[Int]] = _liveJobToStages.toMap + def stageToTasks: Map[Int, Set[Long]] = _stageToTasks.mapValues(_.toSet).toMap + def stageToRDDs: Map[Int, Seq[Int]] = _stageToRDDs.toMap + def liveExecutors: Set[String] = _liveExecutors.toSet + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + totalJobs += 1 + totalStages += jobStart.stageIds.length + _liveJobToStages += jobStart.jobId -> jobStart.stageIds + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + val stages = _liveJobToStages.getOrElse(jobEnd.jobId, Seq.empty[Int]) + _liveJobToStages -= jobEnd.jobId + _stageToTasks --= stages + _stageToRDDs --= stages + } + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { + _stageToRDDs.getOrElseUpdate(stageSubmitted.stageInfo.stageId, + stageSubmitted.stageInfo.rddInfos.map(_.id)) + } + + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + totalTasks += 1 + val curTasks = _stageToTasks.getOrElseUpdate(taskStart.stageId, + mutable.HashSet[Long]()) + curTasks += taskStart.taskInfo.taskId + } + + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { + _liveExecutors += executorAdded.executorId + } + + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + _liveExecutors -= executorRemoved.executorId + } + + override def createFilter(): EventFilter = new BasicEventFilter(this) + + def statistics(): FilterStatistics = { + FilterStatistics(totalJobs, liveJobToStages.size, totalStages, + liveJobToStages.map(_._2.size).sum, totalTasks, _stageToTasks.map(_._2.size).sum) + } +} + +/** + * This class provides the functionality to reject events which are related to the finished + * jobs based on the given information. This class only deals with job related events, and provides + * a PartialFunction which returns false for rejected events for finished jobs, returns true + * otherwise. + */ +private[spark] abstract class JobEventFilter( + stats: Option[FilterStatistics], + jobToStages: Map[Int, Seq[Int]], + stageToTasks: Map[Int, Set[Long]], + stageToRDDs: Map[Int, Seq[Int]]) extends EventFilter with Logging { + + private val liveTasks: Set[Long] = stageToTasks.values.flatten.toSet + private val liveRDDs: Set[Int] = stageToRDDs.values.flatten.toSet + + logDebug(s"jobs : ${jobToStages.keySet}") + logDebug(s"stages in jobs : ${jobToStages.values.flatten}") + logDebug(s"stages : ${stageToTasks.keySet}") + logDebug(s"tasks in stages : ${stageToTasks.values.flatten}") + logDebug(s"RDDs in stages : ${stageToRDDs.values.flatten}") + + override def statistics(): Option[FilterStatistics] = stats + + protected val acceptFnForJobEvents: PartialFunction[SparkListenerEvent, Boolean] = { + case e: SparkListenerStageCompleted => + stageToTasks.contains(e.stageInfo.stageId) + + case e: SparkListenerStageSubmitted => + stageToTasks.contains(e.stageInfo.stageId) + + case e: SparkListenerTaskStart => + liveTasks.contains(e.taskInfo.taskId) + + case e: SparkListenerTaskGettingResult => + liveTasks.contains(e.taskInfo.taskId) + + case e: SparkListenerTaskEnd => + liveTasks.contains(e.taskInfo.taskId) + + case e: SparkListenerJobStart => + jobToStages.contains(e.jobId) + + case e: SparkListenerJobEnd => + jobToStages.contains(e.jobId) + + case e: SparkListenerUnpersistRDD => + liveRDDs.contains(e.rddId) + + case e: SparkListenerExecutorMetricsUpdate => + e.accumUpdates.exists { case (_, stageId, _, _) => + stageToTasks.contains(stageId) + } + + case e: SparkListenerSpeculativeTaskSubmitted => + stageToTasks.contains(e.stageId) + } +} + +/** + * This class rejects events which are related to the finished jobs or dead executors, + * based on the given information. The events which are not related to the job and executor + * will be considered as "Don't mind". + */ +private[spark] class BasicEventFilter( + _stats: FilterStatistics, + _liveJobToStages: Map[Int, Seq[Int]], + _stageToTasks: Map[Int, Set[Long]], + _stageToRDDs: Map[Int, Seq[Int]], + liveExecutors: Set[String]) + extends JobEventFilter(Some(_stats), _liveJobToStages, _stageToTasks, _stageToRDDs) with Logging { + + def this(builder: BasicEventFilterBuilder) = { + this(builder.statistics(), builder.liveJobToStages, builder.stageToTasks, builder.stageToRDDs, + builder.liveExecutors) + } + + logDebug(s"live executors : $liveExecutors") + + private val _acceptFn: PartialFunction[SparkListenerEvent, Boolean] = { + case e: SparkListenerExecutorAdded => liveExecutors.contains(e.executorId) + case e: SparkListenerExecutorRemoved => liveExecutors.contains(e.executorId) + case e: SparkListenerExecutorBlacklisted => liveExecutors.contains(e.executorId) + case e: SparkListenerExecutorUnblacklisted => liveExecutors.contains(e.executorId) + case e: SparkListenerStageExecutorMetrics => liveExecutors.contains(e.execId) + } + + override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { + _acceptFn.orElse(acceptFnForJobEvents) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala new file mode 100644 index 0000000000000..edbb34bff77d8 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.io.{Codec, Source} +import scala.util.control.NonFatal + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.json4s.jackson.JsonMethods.parse + +import org.apache.spark.deploy.history.EventFilter.FilterStatistics +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.util.{JsonProtocol, Utils} + +/** + * EventFilterBuilder provides the interface to gather the information from events being received + * by [[SparkListenerInterface]], and create a new [[EventFilter]] instance which leverages + * information gathered to decide whether the event should be accepted or not. + */ +private[spark] trait EventFilterBuilder extends SparkListenerInterface { + def createFilter(): EventFilter +} + +/** [[EventFilter]] decides whether the given event should be accepted or rejected. */ +private[spark] trait EventFilter { + /** + * Provide statistic information of event filter, which would be used for measuring the score + * of compaction. + * + * To simplify the condition, currently the fields of statistic are static, since major kinds of + * events compaction would filter out are job related event types. If the filter doesn't track + * with job related events, return None instead. + */ + def statistics(): Option[FilterStatistics] + + /** + * Classify whether the event is accepted or rejected by this filter. + * + * The method should return the partial function which matches the events where the filter can + * decide whether the event should be accepted or rejected. Otherwise it should leave the events + * be unmatched. + */ + def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] +} + +object EventFilter extends Logging { + case class FilterStatistics( + totalJobs: Long, + liveJobs: Long, + totalStages: Long, + liveStages: Long, + totalTasks: Long, + liveTasks: Long) + + def applyFilterToFile( + fs: FileSystem, + filters: Seq[EventFilter], + path: Path, + onAccepted: (String, SparkListenerEvent) => Unit, + onRejected: (String, SparkListenerEvent) => Unit, + onUnidentified: String => Unit): Unit = { + Utils.tryWithResource(EventLogFileReader.openEventLog(path, fs)) { in => + val lines = Source.fromInputStream(in)(Codec.UTF8).getLines() + + lines.zipWithIndex.foreach { case (line, lineNum) => + try { + val event = try { + Some(JsonProtocol.sparkEventFromJson(parse(line))) + } catch { + // ignore any exception occurred from unidentified json + case NonFatal(_) => + onUnidentified(line) + None + } + + event.foreach { e => + val results = filters.flatMap(_.acceptFn().lift.apply(e)) + if (results.isEmpty || !results.contains(false)) { + onAccepted(line, e) + } else { + onRejected(line, e) + } + } + } catch { + case e: Exception => + logError(s"Exception parsing Spark event log: ${path.getName}", e) + logError(s"Malformed line #$lineNum: $line\n") + throw e + } + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala new file mode 100644 index 0000000000000..686ed7cba0a49 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.IOException +import java.net.URI +import java.util.ServiceLoader + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.history.EventFilter.FilterStatistics +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{EVENT_LOG_COMPACTION_SCORE_THRESHOLD, EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN} +import org.apache.spark.scheduler.ReplayListenerBus +import org.apache.spark.util.Utils + +/** + * This class compacts the old event log files into one compact file, via two phases reading: + * + * 1) Initialize available [[EventFilterBuilder]] instances, and replay the old event log files with + * builders, so that these builders can gather the information to create [[EventFilter]] instances. + * 2) Initialize [[EventFilter]] instances from [[EventFilterBuilder]] instances, and replay the + * old event log files with filters. Rewrite the events to the compact file which the filters decide + * to accept. + * + * This class will calculate the score based on statistic from [[EventFilter]] instances, which + * represents approximate rate of filtered-out events. Score is being calculated via applying + * heuristic; task events tend to take most size in event log. + * + * This class assumes caller will provide the sorted list of files which are sorted by the index of + * event log file, with "at most" one compact file placed first if it exists. Caller should keep in + * mind that this class doesn't care about the semantic of ordering. + * + * When compacting the files, the range of compaction for given file list is determined as: + * (first ~ the file where there're `maxFilesToRetain` files on the right side) + * + * If there're not enough files on the range of compaction, compaction will be skipped. + */ +class EventLogFileCompactor( + sparkConf: SparkConf, + hadoopConf: Configuration, + fs: FileSystem) extends Logging { + private val maxFilesToRetain: Int = sparkConf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN) + private val compactionThresholdScore: Double = sparkConf.get(EVENT_LOG_COMPACTION_SCORE_THRESHOLD) + + def compact(eventLogFiles: Seq[FileStatus]): (CompactionResult.Value, Option[Long]) = { + assertPrecondition(eventLogFiles) + + if (eventLogFiles.length < maxFilesToRetain) { + return (CompactionResult.NOT_ENOUGH_FILES, None) + } + + val filesToCompact = findFilesToCompact(eventLogFiles) + if (filesToCompact.isEmpty) { + (CompactionResult.NOT_ENOUGH_FILES, None) + } else { + val builders = initializeBuilders(fs, filesToCompact.map(_.getPath)) + + val filters = builders.map(_.createFilter()) + val minScore = filters.flatMap(_.statistics()).map(calculateScore).min + + if (minScore < compactionThresholdScore) { + (CompactionResult.LOW_SCORE_FOR_COMPACTION, None) + } else { + val rewriter = new FilteredEventLogFileRewriter(sparkConf, hadoopConf, fs, filters) + rewriter.rewrite(filesToCompact) + cleanupCompactedFiles(filesToCompact) + (CompactionResult.SUCCESS, Some(RollingEventLogFilesWriter.getEventLogFileIndex( + filesToCompact.last.getPath.getName))) + } + } + } + + private def assertPrecondition(eventLogFiles: Seq[FileStatus]): Unit = { + val idxCompactedFiles = eventLogFiles.zipWithIndex.filter { case (file, _) => + EventLogFileWriter.isCompacted(file.getPath) + } + require(idxCompactedFiles.size < 2 && idxCompactedFiles.headOption.forall(_._2 == 0), + "The number of compact files should be at most 1, and should be placed first if exists.") + } + + /** + * Loads all available EventFilterBuilders in classloader via ServiceLoader, and initializes + * them via replaying events in given files. + */ + private def initializeBuilders(fs: FileSystem, files: Seq[Path]): Seq[EventFilterBuilder] = { + val bus = new ReplayListenerBus() + + val builders = ServiceLoader.load(classOf[EventFilterBuilder], + Utils.getContextOrSparkClassLoader).asScala.toSeq + builders.foreach(bus.addListener) + + files.foreach { log => + Utils.tryWithResource(EventLogFileReader.openEventLog(log, fs)) { in => + bus.replay(in, log.getName) + } + } + + builders + } + + private def calculateScore(stats: FilterStatistics): Double = { + // For now it's simply measuring how many task events will be filtered out (rejected) + // but it can be sophisticated later once we get more heuristic information and found + // the case where this simple calculation doesn't work. + (stats.totalTasks - stats.liveTasks) * 1.0 / stats.totalTasks + } + + private def cleanupCompactedFiles(files: Seq[FileStatus]): Unit = { + files.foreach { file => + var deleted = false + try { + deleted = fs.delete(file.getPath, true) + } catch { + case _: IOException => + } + if (!deleted) { + logWarning(s"Failed to remove ${file.getPath} / skip removing.") + } + } + } + + private def findFilesToCompact(eventLogFiles: Seq[FileStatus]): Seq[FileStatus] = { + val numNormalEventLogFiles = { + if (EventLogFileWriter.isCompacted(eventLogFiles.head.getPath)) { + eventLogFiles.length - 1 + } else { + eventLogFiles.length + } + } + + // This avoids compacting only compact file. + if (numNormalEventLogFiles > maxFilesToRetain) { + eventLogFiles.dropRight(maxFilesToRetain) + } else { + Seq.empty + } + } +} + +object CompactionResult extends Enumeration { + val SUCCESS, NOT_ENOUGH_FILES, LOW_SCORE_FOR_COMPACTION = Value +} + +/** + * This class rewrites the event log files into one compact file: the compact file will only + * contain the events which pass the filters. Events will be dropped only when all filters + * decide to reject the event or don't mind about the event. Otherwise, the original line for + * the event is written to the compact file as it is. + */ +class FilteredEventLogFileRewriter( + sparkConf: SparkConf, + hadoopConf: Configuration, + fs: FileSystem, + filters: Seq[EventFilter]) { + + def rewrite(eventLogFiles: Seq[FileStatus]): String = { + require(eventLogFiles.nonEmpty) + + val lastIndexEventLogPath = eventLogFiles.last.getPath + val logWriter = new CompactedEventLogFileWriter(lastIndexEventLogPath, "dummy", None, + lastIndexEventLogPath.getParent.toUri, sparkConf, hadoopConf) + + logWriter.start() + eventLogFiles.foreach { file => + EventFilter.applyFilterToFile(fs, filters, file.getPath, + onAccepted = (line, _) => logWriter.writeEvent(line, flushLogger = true), + onRejected = (_, _) => {}, + onUnidentified = line => logWriter.writeEvent(line, flushLogger = true) + ) + } + logWriter.stop() + + logWriter.logPath + } +} + +/** + * This class helps to write compact file; to avoid reimplementing everything, it extends + * [[SingleEventLogFileWriter]], but only `originalFilePath` is used to determine the + * path of compact file. + */ +class CompactedEventLogFileWriter( + originalFilePath: Path, + appId: String, + appAttemptId: Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration) + extends SingleEventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) { + + override val logPath: String = originalFilePath.toUri.toString + EventLogFileWriter.COMPACTED +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala index c8956ed3d423d..9f63a6441a838 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala @@ -173,7 +173,8 @@ class SingleFileEventLogFileReader( override def fileSizeForLastIndex: Long = status.getLen - override def completed: Boolean = !rootPath.getName.endsWith(EventLogFileWriter.IN_PROGRESS) + override def completed: Boolean = !rootPath.getName.stripSuffix(EventLogFileWriter.COMPACTED) + .endsWith(EventLogFileWriter.IN_PROGRESS) override def fileSizeForLastIndexForDFS: Option[Long] = { if (completed) { @@ -218,15 +219,23 @@ class RollingEventLogFilesFileReader( private lazy val eventLogFiles: Seq[FileStatus] = { val eventLogFiles = files.filter(isEventLogFile).sortBy { status => - getIndex(status.getPath.getName) + val filePath = status.getPath + var idx = getEventLogFileIndex(filePath.getName).toDouble + // trick to place compacted file later than normal file if index is same. + if (EventLogFileWriter.isCompacted(filePath)) { + idx += 0.1 + } + idx } - val indices = eventLogFiles.map { file => getIndex(file.getPath.getName) }.sorted + val filesToRead = dropBeforeLastCompactFile(eventLogFiles) + val indices = filesToRead.map { file => getEventLogFileIndex(file.getPath.getName) } require((indices.head to indices.last) == indices, "Found missing event log file, expected" + - s" indices: ${(indices.head to indices.last)}, actual: ${indices}") - eventLogFiles + s" indices: ${indices.head to indices.last}, actual: ${indices}") + filesToRead } - override def lastIndex: Option[Long] = Some(getIndex(lastEventLogFile.getPath.getName)) + override def lastIndex: Option[Long] = Some( + getEventLogFileIndex(lastEventLogFile.getPath.getName)) override def fileSizeForLastIndex: Long = lastEventLogFile.getLen @@ -261,4 +270,11 @@ class RollingEventLogFilesFileReader( override def totalSize: Long = eventLogFiles.map(_.getLen).sum private def lastEventLogFile: FileStatus = eventLogFiles.last + + private def dropBeforeLastCompactFile(eventLogFiles: Seq[FileStatus]): Seq[FileStatus] = { + val lastCompactedFileIdx = eventLogFiles.lastIndexWhere { fs => + EventLogFileWriter.isCompacted(fs.getPath) + } + eventLogFiles.drop(lastCompactedFileIdx) + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala index 3fa5ef94892aa..1d58d054b7825 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala @@ -113,9 +113,9 @@ abstract class EventLogFileWriter( } } - protected def writeJson(json: String, flushLogger: Boolean = false): Unit = { + protected def writeLine(line: String, flushLogger: Boolean = false): Unit = { // scalastyle:off println - writer.foreach(_.println(json)) + writer.foreach(_.println(line)) // scalastyle:on println if (flushLogger) { writer.foreach(_.flush()) @@ -164,6 +164,7 @@ abstract class EventLogFileWriter( object EventLogFileWriter { // Suffix applied to the names of files still being written by applications. val IN_PROGRESS = ".inprogress" + val COMPACTED = ".compact" val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) @@ -192,9 +193,11 @@ object EventLogFileWriter { def codecName(log: Path): Option[String] = { // Compression codec is encoded as an extension, e.g. app_123.lzf // Since we sanitize the app ID to not include periods, it is safe to split on it - val logName = log.getName.stripSuffix(IN_PROGRESS) + val logName = log.getName.stripSuffix(COMPACTED).stripSuffix(IN_PROGRESS) logName.split("\\.").tail.lastOption } + + def isCompacted(log: Path): Boolean = log.getName.endsWith(COMPACTED) } /** @@ -211,7 +214,7 @@ class SingleEventLogFileWriter( override val logPath: String = SingleEventLogFileWriter.getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) - private val inProgressPath = logPath + EventLogFileWriter.IN_PROGRESS + protected def inProgressPath = logPath + EventLogFileWriter.IN_PROGRESS override def start(): Unit = { requireLogBaseDirAsDirectory() @@ -222,7 +225,7 @@ class SingleEventLogFileWriter( } override def writeEvent(eventJson: String, flushLogger: Boolean = false): Unit = { - writeJson(eventJson, flushLogger) + writeLine(eventJson, flushLogger) } /** @@ -327,10 +330,11 @@ class RollingEventLogFilesWriter( } } - writeJson(eventJson, flushLogger) + writeLine(eventJson, flushLogger) } - private def rollEventLogFile(): Unit = { + /** exposed for testing only */ + private[history] def rollEventLogFile(): Unit = { closeWriter() index += 1 @@ -399,16 +403,20 @@ object RollingEventLogFilesWriter { status.isDirectory && status.getPath.getName.startsWith(EVENT_LOG_DIR_NAME_PREFIX) } + def isEventLogFile(fileName: String): Boolean = { + fileName.startsWith(EVENT_LOG_FILE_NAME_PREFIX) + } + def isEventLogFile(status: FileStatus): Boolean = { - status.isFile && status.getPath.getName.startsWith(EVENT_LOG_FILE_NAME_PREFIX) + status.isFile && isEventLogFile(status.getPath.getName) } def isAppStatusFile(status: FileStatus): Boolean = { status.isFile && status.getPath.getName.startsWith(APPSTATUS_FILE_NAME_PREFIX) } - def getIndex(eventLogFileName: String): Long = { - require(eventLogFileName.startsWith(EVENT_LOG_FILE_NAME_PREFIX), "Not an event log file!") + def getEventLogFileIndex(eventLogFileName: String): Long = { + require(isEventLogFile(eventLogFileName), "Not an event log file!") val index = eventLogFileName.stripPrefix(EVENT_LOG_FILE_NAME_PREFIX).split("_")(0) index.toLong } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 0e872be9d3c28..2d5d7a5a05701 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -195,6 +195,24 @@ package object config { "configured to be at least 10 MiB.") .createWithDefaultString("128m") + private[spark] val EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN = + ConfigBuilder("spark.eventLog.rolling.maxFilesToRetain") + // TODO: remove this when integrating compactor with FsHistoryProvider + .internal() + .doc("The maximum number of event log files which will be retained as non-compacted. " + + "By default, all event log files will be retained. Please set the configuration " + + s"and ${EVENT_LOG_ROLLING_MAX_FILE_SIZE.key} accordingly if you want to control " + + "the overall size of event log files.") + .intConf + .checkValue(_ > 0, "Max event log files to retain should be higher than 0.") + .createWithDefault(Integer.MAX_VALUE) + + private[spark] val EVENT_LOG_COMPACTION_SCORE_THRESHOLD = + ConfigBuilder("spark.eventLog.rolling.compaction.score.threshold") + .internal() + .doubleConf + .createWithDefault(0.7d) + private[spark] val EXECUTOR_ID = ConfigBuilder("spark.executor.id").stringConf.createOptional diff --git a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala new file mode 100644 index 0000000000000..ca570c257c025 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import org.apache.spark.{SparkFunSuite, Success, TaskResultLost, TaskState} +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} +import org.apache.spark.scheduler._ +import org.apache.spark.status.ListenerEventsTestHelper + +class BasicEventFilterBuilderSuite extends SparkFunSuite { + import ListenerEventsTestHelper._ + + override protected def beforeEach(): Unit = { + ListenerEventsTestHelper.reset() + } + + test("track live jobs") { + var time = 0L + + val listener = new BasicEventFilterBuilder + listener.onOtherEvent(SparkListenerLogStart("TestSparkVersion")) + + // Start the application. + time += 1 + listener.onApplicationStart(SparkListenerApplicationStart( + "name", + Some("id"), + time, + "user", + Some("attempt"), + None)) + + // Start a couple of executors. + time += 1 + val execIds = Array("1", "2") + execIds.foreach { id => + listener.onExecutorAdded(createExecutorAddedEvent(id, time)) + } + + // Start a job with 2 stages / 4 tasks each + time += 1 + + val rddsForStage1 = createRdds(2) + val rddsForStage2 = createRdds(2) + + val stage1 = createStage(rddsForStage1, Nil) + val stage2 = createStage(rddsForStage2, Seq(stage1.stageId)) + val stages = Seq(stage1, stage2) + + val jobProps = createJobProps() + listener.onJobStart(SparkListenerJobStart(1, time, stages, jobProps)) + + // Submit stage 1 + time += 1 + stages.head.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stages.head, jobProps)) + + // Start tasks from stage 1 + time += 1 + + val s1Tasks = ListenerEventsTestHelper.createTasks(4, execIds, time) + s1Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, + stages.head.attemptNumber(), task)) + } + + // Fail one of the tasks, re-start it. + time += 1 + s1Tasks.head.markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber, + "taskType", TaskResultLost, s1Tasks.head, new ExecutorMetrics, null)) + + time += 1 + val reattempt = createTaskWithNewAttempt(s1Tasks.head, time) + listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber, + reattempt)) + + // Succeed all tasks in stage 1. + val pending = s1Tasks.drop(1) ++ Seq(reattempt) + + val s1Metrics = TaskMetrics.empty + s1Metrics.setExecutorCpuTime(2L) + s1Metrics.setExecutorRunTime(4L) + + time += 1 + pending.foreach { task => + task.markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber, + "taskType", Success, task, new ExecutorMetrics, s1Metrics)) + } + + // End stage 1. + time += 1 + stages.head.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stages.head)) + + assert(listener.liveJobToStages.keys.toSeq === Seq(1)) + assert(listener.liveJobToStages(1) === Seq(0, 1)) + assert(listener.stageToRDDs.keys.toSeq === Seq(0)) + assert(listener.stageToRDDs(0) === rddsForStage1.map(_.id)) + // stage 1 not yet submitted + assert(listener.stageToTasks.keys.toSeq === Seq(0)) + assert(listener.stageToTasks(0) === (s1Tasks ++ Seq(reattempt)).map(_.taskId).toSet) + + // Submit stage 2. + time += 1 + stages.last.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stages.last, jobProps)) + + // Start and fail all tasks of stage 2. + time += 1 + val s2Tasks = createTasks(4, execIds, time) + s2Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, + stages.last.attemptNumber, + task)) + } + + time += 1 + s2Tasks.foreach { task => + task.markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, stages.last.attemptNumber, + "taskType", TaskResultLost, task, new ExecutorMetrics, null)) + } + + // Fail stage 2. + time += 1 + stages.last.completionTime = Some(time) + stages.last.failureReason = Some("uh oh") + listener.onStageCompleted(SparkListenerStageCompleted(stages.last)) + + // - Re-submit stage 2, all tasks, and succeed them and the stage. + val oldS2 = stages.last + val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptNumber + 1, oldS2.name, oldS2.numTasks, + oldS2.rddInfos, oldS2.parentIds, oldS2.details, oldS2.taskMetrics) + + time += 1 + newS2.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(newS2, jobProps)) + + val newS2Tasks = createTasks(4, execIds, time) + + newS2Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptNumber, task)) + } + + time += 1 + newS2Tasks.foreach { task => + task.markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, newS2.attemptNumber, "taskType", + Success, task, new ExecutorMetrics, null)) + } + + time += 1 + newS2.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(newS2)) + + assert(listener.liveJobToStages.keys.toSeq === Seq(1)) + assert(listener.liveJobToStages(1) === Seq(0, 1)) + assert(listener.stageToRDDs.keys === Set(0, 1)) + assert(listener.stageToRDDs(0) === rddsForStage1.map(_.id)) + assert(listener.stageToRDDs(1) === rddsForStage2.map(_.id)) + assert(listener.stageToTasks.keys.toSet === Set(0, 1)) + // stage 0 is finished but it stores the information regarding stage + assert(listener.stageToTasks(0) === (s1Tasks ++ Seq(reattempt)).map(_.taskId).toSet) + // stage 1 is newly added + assert(listener.stageToTasks(1) === (s2Tasks ++ newS2Tasks).map(_.taskId).toSet) + + // Start next job. + time += 1 + + val rddsForStage3 = createRdds(2) + val rddsForStage4 = createRdds(2) + + val stage3 = createStage(rddsForStage3, Nil) + val stage4 = createStage(rddsForStage4, Seq(stage3.stageId)) + val stagesForJob2 = Seq(stage3, stage4) + + listener.onJobStart(SparkListenerJobStart(2, time, stagesForJob2, jobProps)) + + // End job 1. + time += 1 + listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded)) + + // everything related to job 1 should be cleaned up, but not for job 2 + assert(listener.liveJobToStages.keys.toSet === Set(2)) + assert(listener.stageToRDDs.isEmpty) + // stageToTasks has no information for job 2, as no task has been started + assert(listener.stageToTasks.isEmpty) + } + + test("track live executors") { + var time = 0L + + val listener = new BasicEventFilterBuilder + listener.onOtherEvent(SparkListenerLogStart("TestSparkVersion")) + + // Start the application. + time += 1 + listener.onApplicationStart(SparkListenerApplicationStart( + "name", + Some("id"), + time, + "user", + Some("attempt"), + None)) + + // Start a couple of executors. + time += 1 + val execIds = (1 to 3).map(_.toString) + execIds.foreach { id => + listener.onExecutorAdded(createExecutorAddedEvent(id, time)) + } + + // End one of executors. + time += 1 + listener.onExecutorRemoved(createExecutorRemovedEvent(execIds.head, time)) + + assert(listener.liveExecutors === execIds.drop(1).toSet) + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala new file mode 100644 index 0000000000000..27f2c4bca5c92 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import org.apache.spark.{storage, SparkFunSuite, Success, TaskState} +import org.apache.spark.deploy.history.EventFilter.FilterStatistics +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.scheduler._ +import org.apache.spark.status.ListenerEventsTestHelper._ +import org.apache.spark.storage.{BlockManagerId, RDDBlockId, StorageLevel} + +class BasicEventFilterSuite extends SparkFunSuite { + import BasicEventFilterSuite._ + + test("filter out events for finished jobs") { + // assume finished job 1 with stage 1, tasks (1, 2), rdds (1, 2) + // live job 2 with stages 2, tasks (3, 4), rdds (3, 4) + val liveJobToStages: Map[Int, Seq[Int]] = Map(2 -> Seq(2, 3)) + val stageToTasks: Map[Int, Set[Long]] = Map(2 -> Set(3, 4), 3 -> Set(5, 6)) + val stageToRDDs: Map[Int, Seq[Int]] = Map(2 -> Seq(3, 4), 3 -> Seq(5, 6)) + val liveExecutors: Set[String] = Set("1", "2") + val filterStats = FilterStatistics(2, 1, 2, 1, 4, 2) + + val filter = new BasicEventFilter(filterStats, liveJobToStages, stageToTasks, stageToRDDs, + liveExecutors) + val acceptFn = filter.acceptFn().lift + + // Verifying with finished job 1 + val rddsForStage1 = createRddsWithId(1 to 2) + val stage1 = createStage(1, rddsForStage1, Nil) + val tasksForStage1 = createTasks(Seq(1L, 2L), liveExecutors.toArray, 0) + tasksForStage1.foreach { task => task.markFinished(TaskState.FINISHED, 5) } + + val jobStartEventForJob1 = SparkListenerJobStart(1, 0, Seq(stage1)) + val jobEndEventForJob1 = SparkListenerJobEnd(1, 0, JobSucceeded) + val stageSubmittedEventsForJob1 = SparkListenerStageSubmitted(stage1) + val stageCompletedEventsForJob1 = SparkListenerStageCompleted(stage1) + val unpersistRDDEventsForJob1 = (1 to 2).map(SparkListenerUnpersistRDD) + + // job events for finished job should be rejected + assertFilterJobEvents(acceptFn, jobStartEventForJob1, jobEndEventForJob1, Some(false)) + + // stage events for finished job should be rejected + // NOTE: it doesn't filter out stage events which are also related to the executor + assertFilterStageEvents( + acceptFn, + stageSubmittedEventsForJob1, + stageCompletedEventsForJob1, + unpersistRDDEventsForJob1, + SparkListenerSpeculativeTaskSubmitted(stage1.stageId, stageAttemptId = 1), + Some(false)) + + // task events for finished job should be rejected + assertFilterTaskEvents(acceptFn, tasksForStage1, stage1, Some(false)) + + // Verifying with live job 2 + val rddsForStage2 = createRddsWithId(3 to 4) + val stage2 = createStage(2, rddsForStage2, Nil) + val tasksForStage2 = createTasks(Seq(3L, 4L), liveExecutors.toArray, 0) + tasksForStage1.foreach { task => task.markFinished(TaskState.FINISHED, 5) } + + val jobStartEventForJob2 = SparkListenerJobStart(2, 0, Seq(stage2)) + val stageSubmittedEventsForJob2 = SparkListenerStageSubmitted(stage2) + val stageCompletedEventsForJob2 = SparkListenerStageCompleted(stage2) + val unpersistRDDEventsForJob2 = rddsForStage2.map { rdd => SparkListenerUnpersistRDD(rdd.id) } + + // job events for live job should be accepted + assert(acceptFn(jobStartEventForJob2) === Some(true)) + + // stage events for live job should be accepted + assertFilterStageEvents( + acceptFn, + stageSubmittedEventsForJob2, + stageCompletedEventsForJob2, + unpersistRDDEventsForJob2, + SparkListenerSpeculativeTaskSubmitted(stage2.stageId, stageAttemptId = 1), + Some(true)) + + // task events for live job should be accepted + assertFilterTaskEvents(acceptFn, tasksForStage2, stage2, Some(true)) + } + + test("filter out events for dead executors") { + // assume executor 1 was dead, and live executor 2 is available + val liveExecutors: Set[String] = Set("2") + + val filter = new BasicEventFilter(EMPTY_STATS, Map.empty, Map.empty, Map.empty, + liveExecutors) + val acceptFn = filter.acceptFn().lift + + // events for dead executor should be rejected + assert(acceptFn(createExecutorAddedEvent(1)) === Some(false)) + // though the name of event is stage executor metrics, AppStatusListener only deals with + // live executors + assert(acceptFn( + SparkListenerStageExecutorMetrics(1.toString, 0, 0, new ExecutorMetrics)) === + Some(false)) + assert(acceptFn(SparkListenerExecutorBlacklisted(0, 1.toString, 1)) === + Some(false)) + assert(acceptFn(SparkListenerExecutorUnblacklisted(0, 1.toString)) === + Some(false)) + assert(acceptFn(createExecutorRemovedEvent(1)) === Some(false)) + + // events for live executor should be accepted + assert(acceptFn(createExecutorAddedEvent(2)) === Some(true)) + assert(acceptFn( + SparkListenerStageExecutorMetrics(2.toString, 0, 0, new ExecutorMetrics)) === + Some(true)) + assert(acceptFn(SparkListenerExecutorBlacklisted(0, 2.toString, 1)) === + Some(true)) + assert(acceptFn(SparkListenerExecutorUnblacklisted(0, 2.toString)) === + Some(true)) + assert(acceptFn(createExecutorRemovedEvent(2)) === Some(true)) + } + + test("other events should be left to other filters") { + def assertNone(predicate: => Option[Boolean]): Unit = { + assert(predicate === None) + } + + val filter = new BasicEventFilter(EMPTY_STATS, Map.empty, Map.empty, Map.empty, Set.empty) + val acceptFn = filter.acceptFn().lift + + assertNone(acceptFn(SparkListenerEnvironmentUpdate(Map.empty))) + assertNone(acceptFn(SparkListenerApplicationStart("1", Some("1"), 0, "user", None))) + assertNone(acceptFn(SparkListenerApplicationEnd(1))) + val bmId = BlockManagerId("1", "host1", 1) + assertNone(acceptFn(SparkListenerBlockManagerAdded(0, bmId, 1))) + assertNone(acceptFn(SparkListenerBlockManagerRemoved(1, bmId))) + assertNone(acceptFn(SparkListenerBlockUpdated( + storage.BlockUpdatedInfo(bmId, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0, 10)))) + assertNone(acceptFn(SparkListenerNodeBlacklisted(0, "host1", 1))) + assertNone(acceptFn(SparkListenerNodeUnblacklisted(0, "host1"))) + assertNone(acceptFn(SparkListenerLogStart("testVersion"))) + } + + private def assertFilterJobEvents( + acceptFn: SparkListenerEvent => Option[Boolean], + jobStart: SparkListenerJobStart, + jobEnd: SparkListenerJobEnd, + expectedVal: Option[Boolean]): Unit = { + assert(acceptFn(jobStart) === expectedVal) + assert(acceptFn(jobEnd) === expectedVal) + } + + private def assertFilterStageEvents( + acceptFn: SparkListenerEvent => Option[Boolean], + stageSubmitted: SparkListenerStageSubmitted, + stageCompleted: SparkListenerStageCompleted, + unpersistRDDs: Seq[SparkListenerUnpersistRDD], + taskSpeculativeSubmitted: SparkListenerSpeculativeTaskSubmitted, + expectedVal: Option[Boolean]): Unit = { + assert(acceptFn(stageSubmitted) === expectedVal) + assert(acceptFn(stageCompleted) === expectedVal) + unpersistRDDs.foreach { event => + assert(acceptFn(event) === expectedVal) + } + assert(acceptFn(taskSpeculativeSubmitted) === expectedVal) + } + + private def assertFilterTaskEvents( + acceptFn: SparkListenerEvent => Option[Boolean], + taskInfos: Seq[TaskInfo], + stageInfo: StageInfo, + expectedVal: Option[Boolean]): Unit = { + taskInfos.foreach { task => + val taskStartEvent = SparkListenerTaskStart(stageInfo.stageId, 0, task) + assert(acceptFn(taskStartEvent) === expectedVal) + + val taskGettingResultEvent = SparkListenerTaskGettingResult(task) + assert(acceptFn(taskGettingResultEvent) === expectedVal) + + val taskEndEvent = SparkListenerTaskEnd(stageInfo.stageId, 0, "taskType", + Success, task, new ExecutorMetrics, null) + assert(acceptFn(taskEndEvent) === expectedVal) + } + } +} + +object BasicEventFilterSuite { + val EMPTY_STATS = FilterStatistics(0, 0, 0, 0, 0, 0) +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala new file mode 100644 index 0000000000000..8c216acb5267a --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.io.{Codec, Source} + +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.json4s.jackson.JsonMethods.parse + +import org.apache.spark.{SparkConf, SparkFunSuite, Success} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper.writeEventsToRollingWriter +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.internal.config.{EVENT_LOG_COMPACTION_SCORE_THRESHOLD, EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN} +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.status.ListenerEventsTestHelper._ +import org.apache.spark.util.{JsonProtocol, Utils} + +class EventLogFileCompactorSuite extends SparkFunSuite { + private val sparkConf = testSparkConf() + private val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) + + test("No event log files") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + + assertNoCompaction(fs, Seq.empty, compactor.compact(Seq.empty), + CompactionResult.NOT_ENOUGH_FILES) + } + } + + test("No compact file, less origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 2).map(_ => testEvent): _*) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertNoCompaction(fs, fileStatuses, compactor.compact(fileStatuses), + CompactionResult.NOT_ENOUGH_FILES) + } + } + + test("No compact file, more origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 5).map(_ => testEvent): _*) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertCompaction(fs, fileStatuses, compactor.compact(fileStatuses), + expectedNumOfFilesCompacted = 2) + } + } + + test("compact file exists, less origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 2).map(_ => testEvent): _*) + + val fileToCompact = fileStatuses.head.getPath + val compactedPath = new Path(fileToCompact.getParent, + fileToCompact.getName + EventLogFileWriter.COMPACTED) + assert(fs.rename(fileToCompact, compactedPath)) + + val newFileStatuses = Seq(fs.getFileStatus(compactedPath)) ++ fileStatuses.drop(1) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertNoCompaction(fs, newFileStatuses, compactor.compact(newFileStatuses), + CompactionResult.NOT_ENOUGH_FILES) + } + } + + test("compact file exists, number of origin files are same as max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 4).map(_ => testEvent): _*) + + val fileToCompact = fileStatuses.head.getPath + val compactedPath = new Path(fileToCompact.getParent, + fileToCompact.getName + EventLogFileWriter.COMPACTED) + assert(fs.rename(fileToCompact, compactedPath)) + + val newFileStatuses = Seq(fs.getFileStatus(compactedPath)) ++ fileStatuses.drop(1) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertNoCompaction(fs, newFileStatuses, compactor.compact(newFileStatuses), + CompactionResult.NOT_ENOUGH_FILES) + } + } + + test("compact file exists, more origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 10).map(_ => testEvent): _*) + + val fileToCompact = fileStatuses.head.getPath + val compactedPath = new Path(fileToCompact.getParent, + fileToCompact.getName + EventLogFileWriter.COMPACTED) + assert(fs.rename(fileToCompact, compactedPath)) + + val newFileStatuses = Seq(fs.getFileStatus(compactedPath)) ++ fileStatuses.drop(1) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertCompaction(fs, newFileStatuses, compactor.compact(newFileStatuses), + expectedNumOfFilesCompacted = 7) + } + } + + test("events for finished job are dropped in new compact file") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + // 1, 2 will be compacted into one file, 3~5 are dummies to ensure max files to retain + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + Seq( + SparkListenerExecutorAdded(0, "exec1", new ExecutorInfo("host1", 1, Map.empty)), + SparkListenerJobStart(1, 0, Seq.empty)), + Seq( + SparkListenerJobEnd(1, 1, JobSucceeded), + SparkListenerExecutorAdded(2, "exec2", new ExecutorInfo("host2", 1, Map.empty))), + testEvent, + testEvent, + testEvent) + + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertCompaction(fs, fileStatuses, compactor.compact(fileStatuses), + expectedNumOfFilesCompacted = 2) + + val expectCompactFileBasePath = fileStatuses.take(2).last.getPath + val compactFilePath = getCompactFilePath(expectCompactFileBasePath) + Utils.tryWithResource(EventLogFileReader.openEventLog(compactFilePath, fs)) { is => + val lines = Source.fromInputStream(is)(Codec.UTF8).getLines().toList + assert(lines.length === 2, "Compacted file should have only two events being accepted") + lines.foreach { line => + val event = JsonProtocol.sparkEventFromJson(parse(line)) + assert(!event.isInstanceOf[SparkListenerJobStart] && + !event.isInstanceOf[SparkListenerJobEnd]) + } + } + } + } + + test("Don't compact file if score is lower than threshold") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + val newConf = sparkConf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.7d) + + // only one of two tasks is finished, which would score 0.5d + val tasks = createTasks(2, Array("exec1"), 0L).map(createTaskStartEvent(_, 1, 0)) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, newConf, hadoopConf, + tasks, + Seq(SparkListenerTaskEnd(1, 0, "taskType", Success, tasks.head.taskInfo, + new ExecutorMetrics, null)), + testEvent, + testEvent, + testEvent) + + val compactor = new EventLogFileCompactor(newConf, hadoopConf, fs) + assertNoCompaction(fs, fileStatuses, compactor.compact(fileStatuses), + CompactionResult.LOW_SCORE_FOR_COMPACTION) + } + } + + private def assertCompaction( + fs: FileSystem, + originalFiles: Seq[FileStatus], + compactRet: (CompactionResult.Value, Option[Long]), + expectedNumOfFilesCompacted: Int): Unit = { + assert(CompactionResult.SUCCESS === compactRet._1) + + val expectRetainedFiles = originalFiles.drop(expectedNumOfFilesCompacted) + expectRetainedFiles.foreach { status => assert(fs.exists(status.getPath)) } + + val expectRemovedFiles = originalFiles.take(expectedNumOfFilesCompacted) + expectRemovedFiles.foreach { status => assert(!fs.exists(status.getPath)) } + + val expectCompactFileBasePath = originalFiles.take(expectedNumOfFilesCompacted).last.getPath + val expectCompactFileIndex = RollingEventLogFilesWriter.getEventLogFileIndex( + expectCompactFileBasePath.getName) + assert(Some(expectCompactFileIndex) === compactRet._2) + + val expectCompactFilePath = getCompactFilePath(expectCompactFileBasePath) + assert(fs.exists(expectCompactFilePath)) + } + + private def getCompactFilePath(expectCompactFileBasePath: Path): Path = { + new Path(expectCompactFileBasePath.getParent, + expectCompactFileBasePath.getName + EventLogFileWriter.COMPACTED) + } + + private def assertNoCompaction( + fs: FileSystem, + originalFiles: Seq[FileStatus], + compactRet: (CompactionResult.Value, Option[Long]), + expectedCompactRet: CompactionResult.Value): Unit = { + assert(compactRet._1 === expectedCompactRet) + assert(None === compactRet._2) + originalFiles.foreach { status => assert(fs.exists(status.getPath)) } + } + + private def testEvent: Seq[SparkListenerEvent] = + Seq(SparkListenerApplicationStart("app", Some("app"), 0, "user", None)) + + private def testSparkConf(): SparkConf = { + new SparkConf() + .set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 3) + // to simplify the tests, we set the score threshold as 0.0d + // individual test can override the value to verify the functionality + .set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d) + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala index a2ce4acdaaf37..8eab2da1a37b7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala @@ -288,13 +288,15 @@ class RollingEventLogFilesReaderSuite extends EventLogFileReadersSuite { assert(status.isDirectory) val statusInDir = fileSystem.listStatus(logPath) - val eventFiles = statusInDir.filter(isEventLogFile).sortBy { s => getIndex(s.getPath.getName) } + val eventFiles = statusInDir.filter(isEventLogFile).sortBy { s => + getEventLogFileIndex(s.getPath.getName) + } assert(eventFiles.nonEmpty) val lastEventFile = eventFiles.last val allLen = eventFiles.map(_.getLen).sum assert(reader.rootPath === fileSystem.makeQualified(logPath)) - assert(reader.lastIndex === Some(getIndex(lastEventFile.getPath.getName))) + assert(reader.lastIndex === Some(getEventLogFileIndex(lastEventFile.getPath.getName))) assert(reader.fileSizeForLastIndex === lastEventFile.getLen) assert(reader.completed === isCompleted) assert(reader.modificationTime === lastEventFile.getModificationTime) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala index c4b40884eebf5..060b878fb8ef2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala @@ -291,7 +291,7 @@ class RollingEventLogFilesWriterSuite extends EventLogFileWritersSuite { expectedMaxSizeBytes: Long): Unit = { assert(eventLogFiles.forall(f => f.getLen <= expectedMaxSizeBytes)) assert((1 to expectedLastIndex) === - eventLogFiles.map(f => getIndex(f.getPath.getName))) + eventLogFiles.map(f => getEventLogFileIndex(f.getPath.getName))) } val appId = getUniqueApplicationId @@ -373,6 +373,6 @@ class RollingEventLogFilesWriterSuite extends EventLogFileWritersSuite { private def listEventLogFiles(logDirPath: Path): Seq[FileStatus] = { fileSystem.listStatus(logDirPath).filter(isEventLogFile) - .sortBy { fs => getIndex(fs.getPath.getName) } + .sortBy { fs => getEventLogFileIndex(fs.getPath.getName) } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala index 55eddce3968c2..85a2acb3477f1 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala @@ -17,12 +17,17 @@ package org.apache.spark.deploy.history +import java.io.File import java.nio.charset.StandardCharsets -import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.json4s.jackson.JsonMethods.{compact, render} import org.apache.spark.SparkConf import org.apache.spark.internal.config._ +import org.apache.spark.scheduler._ +import org.apache.spark.util.JsonProtocol object EventLogTestHelper { def getUniqueApplicationId: String = "test-" + System.currentTimeMillis @@ -56,4 +61,75 @@ object EventLogTestHelper { eventStr } } + + def writeEventLogFile( + sparkConf: SparkConf, + hadoopConf: Configuration, + dir: File, + idx: Int, + events: Seq[SparkListenerEvent]): String = { + // to simplify the code, we don't concern about file name being matched with the naming rule + // of event log file + val writer = new SingleEventLogFileWriter(s"app$idx", None, dir.toURI, sparkConf, hadoopConf) + writer.start() + events.foreach { event => writer.writeEvent(convertEvent(event), flushLogger = true) } + writer.stop() + writer.logPath + } + + def writeEventsToRollingWriter( + fs: FileSystem, + appId: String, + dir: File, + sparkConf: SparkConf, + hadoopConf: Configuration, + eventsFiles: Seq[SparkListenerEvent]*): Seq[FileStatus] = { + val writer = new RollingEventLogFilesWriter(appId, None, dir.toURI, sparkConf, hadoopConf) + writer.start() + + eventsFiles.dropRight(1).foreach { events => + writeEventsToRollingWriter(writer, events, rollFile = true) + } + eventsFiles.lastOption.foreach { events => + writeEventsToRollingWriter(writer, events, rollFile = false) + } + + writer.stop() + EventLogFileReader(fs, new Path(writer.logPath)).get.listEventLogFiles + } + + def writeEventsToRollingWriter( + writer: RollingEventLogFilesWriter, + events: Seq[SparkListenerEvent], + rollFile: Boolean): Unit = { + events.foreach { event => writer.writeEvent(convertEvent(event), flushLogger = true) } + if (rollFile) writer.rollEventLogFile() + } + + def convertEvent(event: SparkListenerEvent): String = { + compact(render(JsonProtocol.sparkEventToJson(event))) + } + + class TestEventFilter1 extends EventFilter { + override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { + case _: SparkListenerApplicationEnd => true + case _: SparkListenerBlockManagerAdded => true + case _: SparkListenerApplicationStart => false + } + + override def statistics(): Option[EventFilter.FilterStatistics] = None + } + + class TestEventFilter2 extends EventFilter { + override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { + case _: SparkListenerApplicationEnd => true + case _: SparkListenerEnvironmentUpdate => true + case _: SparkListenerNodeBlacklisted => true + case _: SparkListenerBlockManagerAdded => false + case _: SparkListenerApplicationStart => false + case _: SparkListenerNodeUnblacklisted => false + } + + override def statistics(): Option[EventFilter.FilterStatistics] = None + } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala new file mode 100644 index 0000000000000..96883a7647b4e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.collection.mutable +import scala.io.{Codec, Source} + +import org.apache.hadoop.fs.Path + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper.{TestEventFilter1, TestEventFilter2} +import org.apache.spark.scheduler._ +import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.Utils + +class FilteredEventLogFileRewriterSuite extends SparkFunSuite { + test("rewrite files with test filters") { + def writeEventToWriter(writer: EventLogFileWriter, event: SparkListenerEvent): String = { + val line = EventLogTestHelper.convertEvent(event) + writer.writeEvent(line, flushLogger = true) + line + } + + withTempDir { tempDir => + val sparkConf = new SparkConf + val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) + val fs = new Path(tempDir.getAbsolutePath).getFileSystem(hadoopConf) + + val writer = new SingleEventLogFileWriter("app", None, tempDir.toURI, sparkConf, hadoopConf) + writer.start() + + val expectedLines = new mutable.ArrayBuffer[String] + + // filterApplicationEnd: Some(true) & Some(true) => filter in + expectedLines += writeEventToWriter(writer, SparkListenerApplicationEnd(0)) + + // filterBlockManagerAdded: Some(true) & Some(false) => filter out + writeEventToWriter(writer, SparkListenerBlockManagerAdded(0, BlockManagerId("1", "host1", 1), + 10)) + + // filterApplicationStart: Some(false) & Some(false) => filter out + writeEventToWriter(writer, SparkListenerApplicationStart("app", None, 0, "user", None)) + + // filterNodeBlacklisted: None & Some(true) => filter in + expectedLines += writeEventToWriter(writer, SparkListenerNodeBlacklisted(0, "host1", 1)) + + // filterNodeUnblacklisted: None & Some(false) => filter out + writeEventToWriter(writer, SparkListenerNodeUnblacklisted(0, "host1")) + + // other events: None & None => filter in + expectedLines += writeEventToWriter(writer, SparkListenerUnpersistRDD(0)) + + writer.stop() + + val filters = Seq(new TestEventFilter1, new TestEventFilter2) + + val rewriter = new FilteredEventLogFileRewriter(sparkConf, hadoopConf, fs, filters) + val logPath = new Path(writer.logPath) + val newPath = rewriter.rewrite(Seq(fs.getFileStatus(logPath))) + assert(new Path(newPath).getName === logPath.getName + EventLogFileWriter.COMPACTED) + + Utils.tryWithResource(EventLogFileReader.openEventLog(new Path(newPath), fs)) { is => + val lines = Source.fromInputStream(is)(Codec.UTF8).getLines() + var linesLength = 0 + lines.foreach { line => + linesLength += 1 + assert(expectedLines.contains(line)) + } + assert(linesLength === expectedLines.length, "The number of lines for rewritten file " + + s"is not expected: expected ${expectedLines.length} / actual $linesLength") + } + } + } +} diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index a289dddbdc9e6..e7eed7bf4c879 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -32,12 +32,12 @@ import org.apache.spark.internal.config.Status._ import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster._ +import org.apache.spark.status.ListenerEventsTestHelper._ import org.apache.spark.status.api.v1 import org.apache.spark.storage._ import org.apache.spark.util.Utils class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { - private val conf = new SparkConf() .set(LIVE_ENTITY_UPDATE_PERIOD, 0L) .set(ASYNC_TRACKING_ENABLED, false) @@ -1694,40 +1694,4 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { def blockId: BlockId = RDDBlockId(rddId, partId) } - - /** Create a stage submitted event for the specified stage Id. */ - private def createStageSubmittedEvent(stageId: Int) = { - SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0, - Seq.empty, Seq.empty, "details")) - } - - /** Create a stage completed event for the specified stage Id. */ - private def createStageCompletedEvent(stageId: Int) = { - SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0, - Seq.empty, Seq.empty, "details")) - } - - /** Create an executor added event for the specified executor Id. */ - private def createExecutorAddedEvent(executorId: Int) = { - SparkListenerExecutorAdded(0L, executorId.toString, - new ExecutorInfo("host1", 1, Map.empty, Map.empty)) - } - - /** Create an executor added event for the specified executor Id. */ - private def createExecutorRemovedEvent(executorId: Int) = { - SparkListenerExecutorRemoved(10L, executorId.toString, "test") - } - - /** Create an executor metrics update event, with the specified executor metrics values. */ - private def createExecutorMetricsUpdateEvent( - stageId: Int, - executorId: Int, - executorMetrics: Array[Long]): SparkListenerExecutorMetricsUpdate = { - val taskMetrics = TaskMetrics.empty - taskMetrics.incDiskBytesSpilled(111) - taskMetrics.incMemoryBytesSpilled(222) - val accum = Array((333L, 1, 1, taskMetrics.accumulators().map(AccumulatorSuite.makeInfo))) - val executorUpdates = Map((stageId, 0) -> new ExecutorMetrics(executorMetrics)) - SparkListenerExecutorMetricsUpdate(executorId.toString, accum, executorUpdates) - } } diff --git a/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala b/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala new file mode 100644 index 0000000000000..37a35744ada5e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.Properties + +import scala.collection.immutable.Map + +import org.apache.spark.{AccumulatorSuite, SparkContext, Success, TaskState} +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded, SparkListenerExecutorMetricsUpdate, SparkListenerExecutorRemoved, SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskEnd, SparkListenerTaskStart, StageInfo, TaskInfo, TaskLocality} +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.storage.{RDDInfo, StorageLevel} + +object ListenerEventsTestHelper { + + private var taskIdTracker = -1L + private var rddIdTracker = -1 + private var stageIdTracker = -1 + + def reset(): Unit = { + taskIdTracker = -1L + rddIdTracker = -1 + stageIdTracker = -1 + } + + def createJobProps(): Properties = { + val jobProps = new Properties() + jobProps.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, "jobDescription") + jobProps.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "jobGroup") + jobProps.setProperty(SparkContext.SPARK_SCHEDULER_POOL, "schedPool") + jobProps + } + + def createRddsWithId(ids: Seq[Int]): Seq[RDDInfo] = { + ids.map { rddId => + new RDDInfo(rddId, s"rdd${rddId}", 2, StorageLevel.NONE, false, Nil) + } + } + + def createRdds(count: Int): Seq[RDDInfo] = { + (1 to count).map { _ => + val rddId = nextRddId() + new RDDInfo(rddId, s"rdd${rddId}", 2, StorageLevel.NONE, false, Nil) + } + } + + def createStage(id: Int, rdds: Seq[RDDInfo], parentIds: Seq[Int]): StageInfo = { + new StageInfo(id, 0, s"stage${id}", 4, rdds, parentIds, s"details${id}") + } + + def createStage(rdds: Seq[RDDInfo], parentIds: Seq[Int]): StageInfo = { + createStage(nextStageId(), rdds, parentIds) + } + + def createTasks(ids: Seq[Long], execs: Array[String], time: Long): Seq[TaskInfo] = { + ids.zipWithIndex.map { case (id, idx) => + val exec = execs(idx % execs.length) + new TaskInfo(id, idx, 1, time, exec, s"$exec.example.com", + TaskLocality.PROCESS_LOCAL, idx % 2 == 0) + } + } + + def createTasks(count: Int, execs: Array[String], time: Long): Seq[TaskInfo] = { + createTasks((1 to count).map { _ => nextTaskId() }, execs, time) + } + + def createTaskWithNewAttempt(orig: TaskInfo, time: Long): TaskInfo = { + // Task reattempts have a different ID, but the same index as the original. + new TaskInfo(nextTaskId(), orig.index, orig.attemptNumber + 1, time, orig.executorId, + s"${orig.executorId}.example.com", TaskLocality.PROCESS_LOCAL, orig.speculative) + } + + def createTaskStartEvent( + taskInfo: TaskInfo, + stageId: Int, + attemptId: Int): SparkListenerTaskStart = { + SparkListenerTaskStart(stageId, attemptId, taskInfo) + } + + /** Create a stage submitted event for the specified stage Id. */ + def createStageSubmittedEvent(stageId: Int): SparkListenerStageSubmitted = { + SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + } + + /** Create a stage completed event for the specified stage Id. */ + def createStageCompletedEvent(stageId: Int): SparkListenerStageCompleted = { + SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + } + + def createExecutorAddedEvent(executorId: Int): SparkListenerExecutorAdded = { + createExecutorAddedEvent(executorId.toString, 0) + } + + /** Create an executor added event for the specified executor Id. */ + def createExecutorAddedEvent(executorId: String, time: Long): SparkListenerExecutorAdded = { + SparkListenerExecutorAdded(time, executorId, + new ExecutorInfo("host1", 1, Map.empty, Map.empty)) + } + + def createExecutorRemovedEvent(executorId: Int): SparkListenerExecutorRemoved = { + createExecutorRemovedEvent(executorId.toString, 10L) + } + + /** Create an executor added event for the specified executor Id. */ + def createExecutorRemovedEvent(executorId: String, time: Long): SparkListenerExecutorRemoved = { + SparkListenerExecutorRemoved(time, executorId, "test") + } + + /** Create an executor metrics update event, with the specified executor metrics values. */ + def createExecutorMetricsUpdateEvent( + stageId: Int, + executorId: Int, + executorMetrics: Array[Long]): SparkListenerExecutorMetricsUpdate = { + val taskMetrics = TaskMetrics.empty + taskMetrics.incDiskBytesSpilled(111) + taskMetrics.incMemoryBytesSpilled(222) + val accum = Array((333L, 1, 1, taskMetrics.accumulators().map(AccumulatorSuite.makeInfo))) + val executorUpdates = Map((stageId, 0) -> new ExecutorMetrics(executorMetrics)) + SparkListenerExecutorMetricsUpdate(executorId.toString, accum, executorUpdates) + } + + case class JobInfo( + stageIds: Seq[Int], + stageToTaskIds: Map[Int, Seq[Long]], + stageToRddIds: Map[Int, Seq[Int]]) + + def pushJobEventsWithoutJobEnd( + listener: SparkListener, + jobId: Int, + jobProps: Properties, + execIds: Array[String], + time: Long): JobInfo = { + // Start a job with 1 stages / 4 tasks each + val rddsForStage = createRdds(2) + val stage = createStage(rddsForStage, Nil) + + listener.onJobStart(SparkListenerJobStart(jobId, time, Seq(stage), jobProps)) + + // Submit stage + stage.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage, jobProps)) + + // Start tasks from stage + val s1Tasks = createTasks(4, execIds, time) + s1Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, + stage.attemptNumber(), task)) + } + + // Succeed all tasks in stage. + val s1Metrics = TaskMetrics.empty + s1Metrics.setExecutorCpuTime(2L) + s1Metrics.setExecutorRunTime(4L) + + s1Tasks.foreach { task => + task.markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, + "taskType", Success, task, new ExecutorMetrics, s1Metrics)) + } + + // End stage. + stage.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stage)) + + JobInfo(Seq(stage.stageId), Map(stage.stageId -> s1Tasks.map(_.taskId)), + Map(stage.stageId -> rddsForStage.map(_.id))) + } + + private def nextTaskId(): Long = { + taskIdTracker += 1 + taskIdTracker + } + + private def nextRddId(): Int = { + rddIdTracker += 1 + rddIdTracker + } + + private def nextStageId(): Int = { + stageIdTracker += 1 + stageIdTracker + } +} From c06575c8843e3655f32e391782f8ddb9d8899c2f Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 3 Jan 2020 16:46:58 +0900 Subject: [PATCH 2/7] Address events out of order --- .../history/BasicEventFilterBuilder.scala | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala index fa51c1b0bc8b6..289e579266e97 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala @@ -76,11 +76,23 @@ private[spark] class BasicEventFilterBuilder extends SparkListener with EventFil _liveExecutors -= executorRemoved.executorId } - override def createFilter(): EventFilter = new BasicEventFilter(this) + override def createFilter(): EventFilter = { + cleanupInvalidStages() - def statistics(): FilterStatistics = { - FilterStatistics(totalJobs, liveJobToStages.size, totalStages, + val stats = FilterStatistics(totalJobs, liveJobToStages.size, totalStages, liveJobToStages.map(_._2.size).sum, totalTasks, _stageToTasks.map(_._2.size).sum) + + new BasicEventFilter(stats, liveJobToStages, stageToTasks, stageToRDDs, liveExecutors) + } + + private def cleanupInvalidStages(): Unit = { + val allValidStages = liveJobToStages.flatMap { case (_, stages) => stages.toSet }.toSet + _stageToTasks.keySet.diff(allValidStages).foreach { stageId => + _stageToTasks.remove(stageId) + } + _stageToRDDs.keySet.diff(allValidStages).foreach { stageId => + _stageToRDDs.remove(stageId) + } } } @@ -155,11 +167,6 @@ private[spark] class BasicEventFilter( liveExecutors: Set[String]) extends JobEventFilter(Some(_stats), _liveJobToStages, _stageToTasks, _stageToRDDs) with Logging { - def this(builder: BasicEventFilterBuilder) = { - this(builder.statistics(), builder.liveJobToStages, builder.stageToTasks, builder.stageToRDDs, - builder.liveExecutors) - } - logDebug(s"live executors : $liveExecutors") private val _acceptFn: PartialFunction[SparkListenerEvent, Boolean] = { From f92b0cbe40c0f198c82bcea07830da668da9eacf Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 3 Jan 2020 18:23:11 +0900 Subject: [PATCH 3/7] safer condition for filtering out events to deal with events out of order --- .../history/BasicEventFilterBuilder.scala | 106 ++++++++-------- .../BasicEventFilterBuilderSuite.scala | 117 +++++++++--------- .../history/BasicEventFilterSuite.scala | 14 ++- 3 files changed, 115 insertions(+), 122 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala index 289e579266e97..7a6a4a1c09cb5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala @@ -29,36 +29,50 @@ import org.apache.spark.scheduler._ * and dead executors. */ private[spark] class BasicEventFilterBuilder extends SparkListener with EventFilterBuilder { - private val _liveJobToStages = new mutable.HashMap[Int, Seq[Int]] + private val _liveJobToStages = new mutable.HashMap[Int, Set[Int]] private val _stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]] - private val _stageToRDDs = new mutable.HashMap[Int, Seq[Int]] + private val _stageToRDDs = new mutable.HashMap[Int, Set[Int]] private val _liveExecutors = new mutable.HashSet[String] private var totalJobs: Long = 0L private var totalStages: Long = 0L private var totalTasks: Long = 0L - def liveJobToStages: Map[Int, Seq[Int]] = _liveJobToStages.toMap - def stageToTasks: Map[Int, Set[Long]] = _stageToTasks.mapValues(_.toSet).toMap - def stageToRDDs: Map[Int, Seq[Int]] = _stageToRDDs.toMap + def liveJobs: Set[Int] = _liveJobToStages.keySet.toSet + def liveStages: Set[Int] = _stageToRDDs.keySet.toSet + def liveTasks: Set[Long] = _stageToTasks.values.flatten.toSet + def liveRDDs: Set[Int] = _stageToRDDs.values.flatten.toSet def liveExecutors: Set[String] = _liveExecutors.toSet override def onJobStart(jobStart: SparkListenerJobStart): Unit = { totalJobs += 1 - totalStages += jobStart.stageIds.length - _liveJobToStages += jobStart.jobId -> jobStart.stageIds + jobStart.stageIds.foreach { stageId => + if (_stageToRDDs.get(stageId).isEmpty) { + // stage submit event is not received yet + totalStages += 1 + _stageToRDDs.put(stageId, Set.empty[Int]) + } + } + _liveJobToStages += jobStart.jobId -> jobStart.stageIds.toSet } override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { val stages = _liveJobToStages.getOrElse(jobEnd.jobId, Seq.empty[Int]) _liveJobToStages -= jobEnd.jobId + // This might leave some stages and tasks if job end event comes earlier than stage submitted + // or task start event; it's not accurate but safer than dropping wrong events which cannot be + // restored. _stageToTasks --= stages _stageToRDDs --= stages } override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { - _stageToRDDs.getOrElseUpdate(stageSubmitted.stageInfo.stageId, - stageSubmitted.stageInfo.rddInfos.map(_.id)) + val stageId = stageSubmitted.stageInfo.stageId + if (_stageToRDDs.get(stageId).isEmpty) { + // job start event is not received yet + totalStages += 1 + } + _stageToRDDs.put(stageId, stageSubmitted.stageInfo.rddInfos.map(_.id).toSet) } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { @@ -77,22 +91,10 @@ private[spark] class BasicEventFilterBuilder extends SparkListener with EventFil } override def createFilter(): EventFilter = { - cleanupInvalidStages() - - val stats = FilterStatistics(totalJobs, liveJobToStages.size, totalStages, - liveJobToStages.map(_._2.size).sum, totalTasks, _stageToTasks.map(_._2.size).sum) - - new BasicEventFilter(stats, liveJobToStages, stageToTasks, stageToRDDs, liveExecutors) - } + val stats = FilterStatistics(totalJobs, liveJobs.size, totalStages, + liveStages.size, totalTasks, liveTasks.size) - private def cleanupInvalidStages(): Unit = { - val allValidStages = liveJobToStages.flatMap { case (_, stages) => stages.toSet }.toSet - _stageToTasks.keySet.diff(allValidStages).foreach { stageId => - _stageToTasks.remove(stageId) - } - _stageToRDDs.keySet.diff(allValidStages).foreach { stageId => - _stageToRDDs.remove(stageId) - } + new BasicEventFilter(stats, liveJobs, liveStages, liveTasks, liveRDDs, liveExecutors) } } @@ -104,53 +106,39 @@ private[spark] class BasicEventFilterBuilder extends SparkListener with EventFil */ private[spark] abstract class JobEventFilter( stats: Option[FilterStatistics], - jobToStages: Map[Int, Seq[Int]], - stageToTasks: Map[Int, Set[Long]], - stageToRDDs: Map[Int, Seq[Int]]) extends EventFilter with Logging { + liveJobs: Set[Int], + liveStages: Set[Int], + liveTasks: Set[Long], + liveRDDs: Set[Int]) extends EventFilter with Logging { - private val liveTasks: Set[Long] = stageToTasks.values.flatten.toSet - private val liveRDDs: Set[Int] = stageToRDDs.values.flatten.toSet - - logDebug(s"jobs : ${jobToStages.keySet}") - logDebug(s"stages in jobs : ${jobToStages.values.flatten}") - logDebug(s"stages : ${stageToTasks.keySet}") - logDebug(s"tasks in stages : ${stageToTasks.values.flatten}") - logDebug(s"RDDs in stages : ${stageToRDDs.values.flatten}") + logDebug(s"jobs : $liveJobs") + logDebug(s"stages : $liveStages") + logDebug(s"tasks : $liveTasks") + logDebug(s"RDDs : $liveRDDs") override def statistics(): Option[FilterStatistics] = stats protected val acceptFnForJobEvents: PartialFunction[SparkListenerEvent, Boolean] = { case e: SparkListenerStageCompleted => - stageToTasks.contains(e.stageInfo.stageId) - + liveStages.contains(e.stageInfo.stageId) case e: SparkListenerStageSubmitted => - stageToTasks.contains(e.stageInfo.stageId) - + liveStages.contains(e.stageInfo.stageId) case e: SparkListenerTaskStart => liveTasks.contains(e.taskInfo.taskId) - case e: SparkListenerTaskGettingResult => liveTasks.contains(e.taskInfo.taskId) - case e: SparkListenerTaskEnd => liveTasks.contains(e.taskInfo.taskId) - case e: SparkListenerJobStart => - jobToStages.contains(e.jobId) - + liveJobs.contains(e.jobId) case e: SparkListenerJobEnd => - jobToStages.contains(e.jobId) - + liveJobs.contains(e.jobId) case e: SparkListenerUnpersistRDD => liveRDDs.contains(e.rddId) - case e: SparkListenerExecutorMetricsUpdate => - e.accumUpdates.exists { case (_, stageId, _, _) => - stageToTasks.contains(stageId) - } - + e.accumUpdates.exists { case (_, stageId, _, _) => liveStages.contains(stageId) } case e: SparkListenerSpeculativeTaskSubmitted => - stageToTasks.contains(e.stageId) + liveStages.contains(e.stageId) } } @@ -161,11 +149,17 @@ private[spark] abstract class JobEventFilter( */ private[spark] class BasicEventFilter( _stats: FilterStatistics, - _liveJobToStages: Map[Int, Seq[Int]], - _stageToTasks: Map[Int, Set[Long]], - _stageToRDDs: Map[Int, Seq[Int]], + _liveJobs: Set[Int], + _liveStages: Set[Int], + _liveTasks: Set[Long], + _liveRDDs: Set[Int], liveExecutors: Set[String]) - extends JobEventFilter(Some(_stats), _liveJobToStages, _stageToTasks, _stageToRDDs) with Logging { + extends JobEventFilter( + Some(_stats), + _liveJobs, + _liveStages, + _liveTasks, + _liveRDDs) with Logging { logDebug(s"live executors : $liveExecutors") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala index ca570c257c025..6e298ad16f74f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala @@ -55,140 +55,134 @@ class BasicEventFilterBuilderSuite extends SparkFunSuite { // Start a job with 2 stages / 4 tasks each time += 1 + val rddsForStage0 = createRdds(2) val rddsForStage1 = createRdds(2) - val rddsForStage2 = createRdds(2) - val stage1 = createStage(rddsForStage1, Nil) - val stage2 = createStage(rddsForStage2, Seq(stage1.stageId)) - val stages = Seq(stage1, stage2) + val stage0 = createStage(rddsForStage0, Nil) + val stage1 = createStage(rddsForStage1, Seq(stage0.stageId)) + val stages = Seq(stage0, stage1) val jobProps = createJobProps() listener.onJobStart(SparkListenerJobStart(1, time, stages, jobProps)) - // Submit stage 1 + // Submit stage 0 time += 1 stages.head.submissionTime = Some(time) listener.onStageSubmitted(SparkListenerStageSubmitted(stages.head, jobProps)) - // Start tasks from stage 1 + // Start tasks from stage 0 time += 1 - val s1Tasks = ListenerEventsTestHelper.createTasks(4, execIds, time) - s1Tasks.foreach { task => + val s0Tasks = ListenerEventsTestHelper.createTasks(4, execIds, time) + s0Tasks.foreach { task => listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber(), task)) } // Fail one of the tasks, re-start it. time += 1 - s1Tasks.head.markFinished(TaskState.FAILED, time) + s0Tasks.head.markFinished(TaskState.FAILED, time) listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber, - "taskType", TaskResultLost, s1Tasks.head, new ExecutorMetrics, null)) + "taskType", TaskResultLost, s0Tasks.head, new ExecutorMetrics, null)) time += 1 - val reattempt = createTaskWithNewAttempt(s1Tasks.head, time) + val reattempt = createTaskWithNewAttempt(s0Tasks.head, time) listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber, reattempt)) - // Succeed all tasks in stage 1. - val pending = s1Tasks.drop(1) ++ Seq(reattempt) + // Succeed all tasks in stage 0. + val pending = s0Tasks.drop(1) ++ Seq(reattempt) - val s1Metrics = TaskMetrics.empty - s1Metrics.setExecutorCpuTime(2L) - s1Metrics.setExecutorRunTime(4L) + val s0Metrics = TaskMetrics.empty + s0Metrics.setExecutorCpuTime(2L) + s0Metrics.setExecutorRunTime(4L) time += 1 pending.foreach { task => task.markFinished(TaskState.FINISHED, time) listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber, - "taskType", Success, task, new ExecutorMetrics, s1Metrics)) + "taskType", Success, task, new ExecutorMetrics, s0Metrics)) } - // End stage 1. + // End stage 0. time += 1 stages.head.completionTime = Some(time) listener.onStageCompleted(SparkListenerStageCompleted(stages.head)) - assert(listener.liveJobToStages.keys.toSeq === Seq(1)) - assert(listener.liveJobToStages(1) === Seq(0, 1)) - assert(listener.stageToRDDs.keys.toSeq === Seq(0)) - assert(listener.stageToRDDs(0) === rddsForStage1.map(_.id)) - // stage 1 not yet submitted - assert(listener.stageToTasks.keys.toSeq === Seq(0)) - assert(listener.stageToTasks(0) === (s1Tasks ++ Seq(reattempt)).map(_.taskId).toSet) + assert(listener.liveJobs === Set(1)) + assert(listener.liveStages === Set(0, 1)) + // stage 1 not yet submitted - RDDs for stage 1 is not available + assert(listener.liveRDDs === rddsForStage0.map(_.id).toSet) + assert(listener.liveTasks === (s0Tasks ++ Seq(reattempt)).map(_.taskId).toSet) - // Submit stage 2. + // Submit stage 1. time += 1 stages.last.submissionTime = Some(time) listener.onStageSubmitted(SparkListenerStageSubmitted(stages.last, jobProps)) - // Start and fail all tasks of stage 2. + // Start and fail all tasks of stage 1. time += 1 - val s2Tasks = createTasks(4, execIds, time) - s2Tasks.foreach { task => + val s1Tasks = createTasks(4, execIds, time) + s1Tasks.foreach { task => listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, stages.last.attemptNumber, task)) } time += 1 - s2Tasks.foreach { task => + s1Tasks.foreach { task => task.markFinished(TaskState.FAILED, time) listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, stages.last.attemptNumber, "taskType", TaskResultLost, task, new ExecutorMetrics, null)) } - // Fail stage 2. + // Fail stage 1. time += 1 stages.last.completionTime = Some(time) stages.last.failureReason = Some("uh oh") listener.onStageCompleted(SparkListenerStageCompleted(stages.last)) - // - Re-submit stage 2, all tasks, and succeed them and the stage. - val oldS2 = stages.last - val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptNumber + 1, oldS2.name, oldS2.numTasks, - oldS2.rddInfos, oldS2.parentIds, oldS2.details, oldS2.taskMetrics) + // - Re-submit stage 1, all tasks, and succeed them and the stage. + val oldS1 = stages.last + val newS1 = new StageInfo(oldS1.stageId, oldS1.attemptNumber + 1, oldS1.name, oldS1.numTasks, + oldS1.rddInfos, oldS1.parentIds, oldS1.details, oldS1.taskMetrics) time += 1 - newS2.submissionTime = Some(time) - listener.onStageSubmitted(SparkListenerStageSubmitted(newS2, jobProps)) + newS1.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(newS1, jobProps)) - val newS2Tasks = createTasks(4, execIds, time) + val newS1Tasks = createTasks(4, execIds, time) - newS2Tasks.foreach { task => - listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptNumber, task)) + newS1Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(newS1.stageId, newS1.attemptNumber, task)) } time += 1 - newS2Tasks.foreach { task => + newS1Tasks.foreach { task => task.markFinished(TaskState.FINISHED, time) - listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, newS2.attemptNumber, "taskType", + listener.onTaskEnd(SparkListenerTaskEnd(newS1.stageId, newS1.attemptNumber, "taskType", Success, task, new ExecutorMetrics, null)) } time += 1 - newS2.completionTime = Some(time) - listener.onStageCompleted(SparkListenerStageCompleted(newS2)) + newS1.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(newS1)) - assert(listener.liveJobToStages.keys.toSeq === Seq(1)) - assert(listener.liveJobToStages(1) === Seq(0, 1)) - assert(listener.stageToRDDs.keys === Set(0, 1)) - assert(listener.stageToRDDs(0) === rddsForStage1.map(_.id)) - assert(listener.stageToRDDs(1) === rddsForStage2.map(_.id)) - assert(listener.stageToTasks.keys.toSet === Set(0, 1)) - // stage 0 is finished but it stores the information regarding stage - assert(listener.stageToTasks(0) === (s1Tasks ++ Seq(reattempt)).map(_.taskId).toSet) - // stage 1 is newly added - assert(listener.stageToTasks(1) === (s2Tasks ++ newS2Tasks).map(_.taskId).toSet) + assert(listener.liveJobs === Set(1)) + assert(listener.liveStages === Set(0, 1)) + // stage 0 and 1 are finished but it stores the information regarding stage + assert(listener.liveRDDs === (rddsForStage0.map(_.id) ++ rddsForStage1.map(_.id)).toSet) + assert(listener.liveTasks === + (s0Tasks ++ Seq(reattempt) ++ s1Tasks ++ newS1Tasks).map(_.taskId).toSet) // Start next job. time += 1 + val rddsForStage2 = createRdds(2) val rddsForStage3 = createRdds(2) - val rddsForStage4 = createRdds(2) - val stage3 = createStage(rddsForStage3, Nil) - val stage4 = createStage(rddsForStage4, Seq(stage3.stageId)) + val stage3 = createStage(rddsForStage2, Nil) + val stage4 = createStage(rddsForStage3, Seq(stage3.stageId)) val stagesForJob2 = Seq(stage3, stage4) listener.onJobStart(SparkListenerJobStart(2, time, stagesForJob2, jobProps)) @@ -198,10 +192,13 @@ class BasicEventFilterBuilderSuite extends SparkFunSuite { listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded)) // everything related to job 1 should be cleaned up, but not for job 2 - assert(listener.liveJobToStages.keys.toSet === Set(2)) - assert(listener.stageToRDDs.isEmpty) + assert(listener.liveJobs === Set(2)) + // there're no stage submit event on job 2, but they will be available on job start event + assert(listener.liveStages === Set(2, 3)) + // no RDD information available as these stages are not submitted yet + assert(listener.liveRDDs.isEmpty) // stageToTasks has no information for job 2, as no task has been started - assert(listener.stageToTasks.isEmpty) + assert(listener.liveTasks.isEmpty) } test("track live executors") { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala index 27f2c4bca5c92..df7c65595e125 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala @@ -30,13 +30,14 @@ class BasicEventFilterSuite extends SparkFunSuite { test("filter out events for finished jobs") { // assume finished job 1 with stage 1, tasks (1, 2), rdds (1, 2) // live job 2 with stages 2, tasks (3, 4), rdds (3, 4) - val liveJobToStages: Map[Int, Seq[Int]] = Map(2 -> Seq(2, 3)) - val stageToTasks: Map[Int, Set[Long]] = Map(2 -> Set(3, 4), 3 -> Set(5, 6)) - val stageToRDDs: Map[Int, Seq[Int]] = Map(2 -> Seq(3, 4), 3 -> Seq(5, 6)) + val liveJobs = Set(2) + val liveStages = Set(2, 3) + val liveTasks = Set(3L, 4L, 5L, 6L) + val liveRDDs = Set(3, 4, 5, 6) val liveExecutors: Set[String] = Set("1", "2") val filterStats = FilterStatistics(2, 1, 2, 1, 4, 2) - val filter = new BasicEventFilter(filterStats, liveJobToStages, stageToTasks, stageToRDDs, + val filter = new BasicEventFilter(filterStats, liveJobs, liveStages, liveTasks, liveRDDs, liveExecutors) val acceptFn = filter.acceptFn().lift @@ -99,7 +100,7 @@ class BasicEventFilterSuite extends SparkFunSuite { // assume executor 1 was dead, and live executor 2 is available val liveExecutors: Set[String] = Set("2") - val filter = new BasicEventFilter(EMPTY_STATS, Map.empty, Map.empty, Map.empty, + val filter = new BasicEventFilter(EMPTY_STATS, Set.empty, Set.empty, Set.empty, Set.empty, liveExecutors) val acceptFn = filter.acceptFn().lift @@ -133,7 +134,8 @@ class BasicEventFilterSuite extends SparkFunSuite { assert(predicate === None) } - val filter = new BasicEventFilter(EMPTY_STATS, Map.empty, Map.empty, Map.empty, Set.empty) + val filter = new BasicEventFilter(EMPTY_STATS, Set.empty, Set.empty, Set.empty, Set.empty, + Set.empty) val acceptFn = filter.acceptFn().lift assertNone(acceptFn(SparkListenerEnvironmentUpdate(Map.empty))) From 9a7a210d8b1214d7d9251bcf6b32a412e1d6e803 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 6 Jan 2020 12:36:19 +0900 Subject: [PATCH 4/7] Reflect review comments --- .../history/BasicEventFilterBuilder.scala | 6 +- .../history/EventLogFileCompactor.scala | 112 ++++++++++-------- .../BasicEventFilterBuilderSuite.scala | 6 +- .../history/BasicEventFilterSuite.scala | 88 +++++++------- .../history/EventLogFileCompactorSuite.scala | 106 +++++++++++++++-- .../deploy/history/EventLogTestHelper.scala | 23 ---- .../FilteredEventLogFileRewriterSuite.scala | 90 -------------- .../status/ListenerEventsTestHelper.scala | 47 -------- 8 files changed, 206 insertions(+), 272 deletions(-) delete mode 100644 core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala index 7a6a4a1c09cb5..4294032dd3462 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala @@ -136,7 +136,9 @@ private[spark] abstract class JobEventFilter( case e: SparkListenerUnpersistRDD => liveRDDs.contains(e.rddId) case e: SparkListenerExecutorMetricsUpdate => - e.accumUpdates.exists { case (_, stageId, _, _) => liveStages.contains(stageId) } + e.accumUpdates.exists { case (taskId, stageId, _, _) => + liveTasks.contains(taskId) || liveStages.contains(stageId) + } case e: SparkListenerSpeculativeTaskSubmitted => liveStages.contains(e.stageId) } @@ -169,6 +171,8 @@ private[spark] class BasicEventFilter( case e: SparkListenerExecutorBlacklisted => liveExecutors.contains(e.executorId) case e: SparkListenerExecutorUnblacklisted => liveExecutors.contains(e.executorId) case e: SparkListenerStageExecutorMetrics => liveExecutors.contains(e.execId) + case _: SparkListenerBlockManagerAdded => true + case _: SparkListenerBlockManagerRemoved => true } override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala index 686ed7cba0a49..12834fa2ee982 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala @@ -45,15 +45,6 @@ import org.apache.spark.util.Utils * This class will calculate the score based on statistic from [[EventFilter]] instances, which * represents approximate rate of filtered-out events. Score is being calculated via applying * heuristic; task events tend to take most size in event log. - * - * This class assumes caller will provide the sorted list of files which are sorted by the index of - * event log file, with "at most" one compact file placed first if it exists. Caller should keep in - * mind that this class doesn't care about the semantic of ordering. - * - * When compacting the files, the range of compaction for given file list is determined as: - * (first ~ the file where there're `maxFilesToRetain` files on the right side) - * - * If there're not enough files on the range of compaction, compaction will be skipped. */ class EventLogFileCompactor( sparkConf: SparkConf, @@ -62,16 +53,34 @@ class EventLogFileCompactor( private val maxFilesToRetain: Int = sparkConf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN) private val compactionThresholdScore: Double = sparkConf.get(EVENT_LOG_COMPACTION_SCORE_THRESHOLD) - def compact(eventLogFiles: Seq[FileStatus]): (CompactionResult.Value, Option[Long]) = { + /** + * Compacts the old event log files into one compact file, and clean old event log files being + * compacted away. + * + * This method assumes caller will provide the sorted list of files which are sorted by + * the index of event log file, with at most one compact file placed first if it exists. + * + * When compacting the files, the range of compaction for given file list is determined as: + * (first ~ the file where there're `maxFilesToRetain` files on the right side) + * + * This method skips compaction for some circumstances described below: + * - not enough files on the range of compaction + * - score is lower than the threshold of compaction (meaning compaction won't help much) + * + * If this method returns the compaction result as SUCCESS, caller needs to re-read the list + * of event log files, as new compact file is available as well as old event log files are + * removed. + */ + def compact(eventLogFiles: Seq[FileStatus]): CompactionResult = { assertPrecondition(eventLogFiles) if (eventLogFiles.length < maxFilesToRetain) { - return (CompactionResult.NOT_ENOUGH_FILES, None) + return CompactionResult(CompactionResultCode.NOT_ENOUGH_FILES, None) } val filesToCompact = findFilesToCompact(eventLogFiles) if (filesToCompact.isEmpty) { - (CompactionResult.NOT_ENOUGH_FILES, None) + CompactionResult(CompactionResultCode.NOT_ENOUGH_FILES, None) } else { val builders = initializeBuilders(fs, filesToCompact.map(_.getPath)) @@ -79,13 +88,12 @@ class EventLogFileCompactor( val minScore = filters.flatMap(_.statistics()).map(calculateScore).min if (minScore < compactionThresholdScore) { - (CompactionResult.LOW_SCORE_FOR_COMPACTION, None) + CompactionResult(CompactionResultCode.LOW_SCORE_FOR_COMPACTION, None) } else { - val rewriter = new FilteredEventLogFileRewriter(sparkConf, hadoopConf, fs, filters) - rewriter.rewrite(filesToCompact) + rewrite(filters, filesToCompact) cleanupCompactedFiles(filesToCompact) - (CompactionResult.SUCCESS, Some(RollingEventLogFilesWriter.getEventLogFileIndex( - filesToCompact.last.getPath.getName))) + CompactionResult(CompactionResultCode.SUCCESS, Some( + RollingEventLogFilesWriter.getEventLogFileIndex(filesToCompact.last.getPath.getName))) } } } @@ -125,6 +133,34 @@ class EventLogFileCompactor( (stats.totalTasks - stats.liveTasks) * 1.0 / stats.totalTasks } + /** + * This class rewrites the event log files into one compact file: the compact file will only + * contain the events which pass the filters. Events will be dropped only when all filters + * decide to reject the event or don't mind about the event. Otherwise, the original line for + * the event is written to the compact file as it is. + */ + private[history] def rewrite( + filters: Seq[EventFilter], + eventLogFiles: Seq[FileStatus]): String = { + require(eventLogFiles.nonEmpty) + + val lastIndexEventLogPath = eventLogFiles.last.getPath + val logWriter = new CompactedEventLogFileWriter(lastIndexEventLogPath, "dummy", None, + lastIndexEventLogPath.getParent.toUri, sparkConf, hadoopConf) + + logWriter.start() + eventLogFiles.foreach { file => + EventFilter.applyFilterToFile(fs, filters, file.getPath, + onAccepted = (line, _) => logWriter.writeEvent(line, flushLogger = true), + onRejected = (_, _) => {}, + onUnidentified = line => logWriter.writeEvent(line, flushLogger = true) + ) + } + logWriter.stop() + + logWriter.logPath + } + private def cleanupCompactedFiles(files: Seq[FileStatus]): Unit = { files.foreach { file => var deleted = false @@ -157,41 +193,17 @@ class EventLogFileCompactor( } } -object CompactionResult extends Enumeration { - val SUCCESS, NOT_ENOUGH_FILES, LOW_SCORE_FOR_COMPACTION = Value -} - /** - * This class rewrites the event log files into one compact file: the compact file will only - * contain the events which pass the filters. Events will be dropped only when all filters - * decide to reject the event or don't mind about the event. Otherwise, the original line for - * the event is written to the compact file as it is. + * Describes the result of compaction. + * + * @param code The result of compaction. + * @param compactIndex The index of compact file if the compaction is successful. + * Otherwise it will be None. */ -class FilteredEventLogFileRewriter( - sparkConf: SparkConf, - hadoopConf: Configuration, - fs: FileSystem, - filters: Seq[EventFilter]) { +case class CompactionResult(code: CompactionResultCode.Value, compactIndex: Option[Long]) - def rewrite(eventLogFiles: Seq[FileStatus]): String = { - require(eventLogFiles.nonEmpty) - - val lastIndexEventLogPath = eventLogFiles.last.getPath - val logWriter = new CompactedEventLogFileWriter(lastIndexEventLogPath, "dummy", None, - lastIndexEventLogPath.getParent.toUri, sparkConf, hadoopConf) - - logWriter.start() - eventLogFiles.foreach { file => - EventFilter.applyFilterToFile(fs, filters, file.getPath, - onAccepted = (line, _) => logWriter.writeEvent(line, flushLogger = true), - onRejected = (_, _) => {}, - onUnidentified = line => logWriter.writeEvent(line, flushLogger = true) - ) - } - logWriter.stop() - - logWriter.logPath - } +object CompactionResultCode extends Enumeration { + val SUCCESS, NOT_ENOUGH_FILES, LOW_SCORE_FOR_COMPACTION = Value } /** @@ -199,7 +211,7 @@ class FilteredEventLogFileRewriter( * [[SingleEventLogFileWriter]], but only `originalFilePath` is used to determine the * path of compact file. */ -class CompactedEventLogFileWriter( +private class CompactedEventLogFileWriter( originalFilePath: Path, appId: String, appAttemptId: Option[String], diff --git a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala index 6e298ad16f74f..3fd6061bb1990 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala @@ -93,15 +93,11 @@ class BasicEventFilterBuilderSuite extends SparkFunSuite { // Succeed all tasks in stage 0. val pending = s0Tasks.drop(1) ++ Seq(reattempt) - val s0Metrics = TaskMetrics.empty - s0Metrics.setExecutorCpuTime(2L) - s0Metrics.setExecutorRunTime(4L) - time += 1 pending.foreach { task => task.markFinished(TaskState.FINISHED, time) listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber, - "taskType", Success, task, new ExecutorMetrics, s0Metrics)) + "taskType", Success, task, new ExecutorMetrics, TaskMetrics.empty)) } // End stage 0. diff --git a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala index df7c65595e125..cd238f662c5d6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala @@ -29,13 +29,23 @@ class BasicEventFilterSuite extends SparkFunSuite { test("filter out events for finished jobs") { // assume finished job 1 with stage 1, tasks (1, 2), rdds (1, 2) - // live job 2 with stages 2, tasks (3, 4), rdds (3, 4) + // live job 2 with stage 2 with tasks (3, 4) & rdds (3, 4), + // and stage 3 with tasks (5, 6) & rdds (5, 6) val liveJobs = Set(2) val liveStages = Set(2, 3) val liveTasks = Set(3L, 4L, 5L, 6L) val liveRDDs = Set(3, 4, 5, 6) val liveExecutors: Set[String] = Set("1", "2") - val filterStats = FilterStatistics(2, 1, 2, 1, 4, 2) + val filterStats = FilterStatistics( + // counts finished job 1 + liveJobs.size + 1, + liveJobs.size, + // counts finished stage 1 for job 1 + liveStages.size + 1, + liveStages.size, + // counts finished tasks (1, 2) for job 1 + liveTasks.size + 2, + liveTasks.size) val filter = new BasicEventFilter(filterStats, liveJobs, liveStages, liveTasks, liveRDDs, liveExecutors) @@ -54,7 +64,8 @@ class BasicEventFilterSuite extends SparkFunSuite { val unpersistRDDEventsForJob1 = (1 to 2).map(SparkListenerUnpersistRDD) // job events for finished job should be rejected - assertFilterJobEvents(acceptFn, jobStartEventForJob1, jobEndEventForJob1, Some(false)) + assert(Some(false) === acceptFn(jobStartEventForJob1)) + assert(Some(false) === acceptFn(jobEndEventForJob1)) // stage events for finished job should be rejected // NOTE: it doesn't filter out stage events which are also related to the executor @@ -81,7 +92,7 @@ class BasicEventFilterSuite extends SparkFunSuite { val unpersistRDDEventsForJob2 = rddsForStage2.map { rdd => SparkListenerUnpersistRDD(rdd.id) } // job events for live job should be accepted - assert(acceptFn(jobStartEventForJob2) === Some(true)) + assert(Some(true) === acceptFn(jobStartEventForJob2)) // stage events for live job should be accepted assertFilterStageEvents( @@ -96,6 +107,16 @@ class BasicEventFilterSuite extends SparkFunSuite { assertFilterTaskEvents(acceptFn, tasksForStage2, stage2, Some(true)) } + test("accept all events for block manager addition/removal") { + val filter = new BasicEventFilter(EMPTY_STATS, Set.empty, Set.empty, Set.empty, Set.empty, + Set.empty) + val acceptFn = filter.acceptFn().lift + + val bmId = BlockManagerId("1", "host1", 1) + assert(Some(true) === acceptFn(SparkListenerBlockManagerAdded(0, bmId, 1))) + assert(Some(true) === acceptFn(SparkListenerBlockManagerRemoved(1, bmId))) + } + test("filter out events for dead executors") { // assume executor 1 was dead, and live executor 2 is available val liveExecutors: Set[String] = Set("2") @@ -105,59 +126,38 @@ class BasicEventFilterSuite extends SparkFunSuite { val acceptFn = filter.acceptFn().lift // events for dead executor should be rejected - assert(acceptFn(createExecutorAddedEvent(1)) === Some(false)) + assert(Some(false) === acceptFn(createExecutorAddedEvent(1))) // though the name of event is stage executor metrics, AppStatusListener only deals with // live executors - assert(acceptFn( - SparkListenerStageExecutorMetrics(1.toString, 0, 0, new ExecutorMetrics)) === - Some(false)) - assert(acceptFn(SparkListenerExecutorBlacklisted(0, 1.toString, 1)) === - Some(false)) - assert(acceptFn(SparkListenerExecutorUnblacklisted(0, 1.toString)) === - Some(false)) - assert(acceptFn(createExecutorRemovedEvent(1)) === Some(false)) + assert(Some(false) === acceptFn( + SparkListenerStageExecutorMetrics(1.toString, 0, 0, new ExecutorMetrics))) + assert(Some(false) === acceptFn(SparkListenerExecutorBlacklisted(0, 1.toString, 1))) + assert(Some(false) === acceptFn(SparkListenerExecutorUnblacklisted(0, 1.toString))) + assert(Some(false) === acceptFn(createExecutorRemovedEvent(1))) // events for live executor should be accepted - assert(acceptFn(createExecutorAddedEvent(2)) === Some(true)) - assert(acceptFn( - SparkListenerStageExecutorMetrics(2.toString, 0, 0, new ExecutorMetrics)) === - Some(true)) - assert(acceptFn(SparkListenerExecutorBlacklisted(0, 2.toString, 1)) === - Some(true)) - assert(acceptFn(SparkListenerExecutorUnblacklisted(0, 2.toString)) === - Some(true)) - assert(acceptFn(createExecutorRemovedEvent(2)) === Some(true)) + assert(Some(true) === acceptFn(createExecutorAddedEvent(2))) + assert(Some(true) === acceptFn( + SparkListenerStageExecutorMetrics(2.toString, 0, 0, new ExecutorMetrics))) + assert(Some(true) === acceptFn(SparkListenerExecutorBlacklisted(0, 2.toString, 1))) + assert(Some(true) === acceptFn(SparkListenerExecutorUnblacklisted(0, 2.toString))) + assert(Some(true) === acceptFn(createExecutorRemovedEvent(2))) } test("other events should be left to other filters") { - def assertNone(predicate: => Option[Boolean]): Unit = { - assert(predicate === None) - } - val filter = new BasicEventFilter(EMPTY_STATS, Set.empty, Set.empty, Set.empty, Set.empty, Set.empty) val acceptFn = filter.acceptFn().lift - assertNone(acceptFn(SparkListenerEnvironmentUpdate(Map.empty))) - assertNone(acceptFn(SparkListenerApplicationStart("1", Some("1"), 0, "user", None))) - assertNone(acceptFn(SparkListenerApplicationEnd(1))) + assert(None === acceptFn(SparkListenerEnvironmentUpdate(Map.empty))) + assert(None === acceptFn(SparkListenerApplicationStart("1", Some("1"), 0, "user", None))) + assert(None === acceptFn(SparkListenerApplicationEnd(1))) val bmId = BlockManagerId("1", "host1", 1) - assertNone(acceptFn(SparkListenerBlockManagerAdded(0, bmId, 1))) - assertNone(acceptFn(SparkListenerBlockManagerRemoved(1, bmId))) - assertNone(acceptFn(SparkListenerBlockUpdated( + assert(None === acceptFn(SparkListenerBlockUpdated( storage.BlockUpdatedInfo(bmId, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0, 10)))) - assertNone(acceptFn(SparkListenerNodeBlacklisted(0, "host1", 1))) - assertNone(acceptFn(SparkListenerNodeUnblacklisted(0, "host1"))) - assertNone(acceptFn(SparkListenerLogStart("testVersion"))) - } - - private def assertFilterJobEvents( - acceptFn: SparkListenerEvent => Option[Boolean], - jobStart: SparkListenerJobStart, - jobEnd: SparkListenerJobEnd, - expectedVal: Option[Boolean]): Unit = { - assert(acceptFn(jobStart) === expectedVal) - assert(acceptFn(jobEnd) === expectedVal) + assert(None === acceptFn(SparkListenerNodeBlacklisted(0, "host1", 1))) + assert(None === acceptFn(SparkListenerNodeUnblacklisted(0, "host1"))) + assert(None === acceptFn(SparkListenerLogStart("testVersion"))) } private def assertFilterStageEvents( diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala index 8c216acb5267a..4420dce527fe4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.history +import scala.collection.mutable import scala.io.{Codec, Source} import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} @@ -30,6 +31,7 @@ import org.apache.spark.internal.config.{EVENT_LOG_COMPACTION_SCORE_THRESHOLD, E import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.status.ListenerEventsTestHelper._ +import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{JsonProtocol, Utils} class EventLogFileCompactorSuite extends SparkFunSuite { @@ -42,7 +44,7 @@ class EventLogFileCompactorSuite extends SparkFunSuite { val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) assertNoCompaction(fs, Seq.empty, compactor.compact(Seq.empty), - CompactionResult.NOT_ENOUGH_FILES) + CompactionResultCode.NOT_ENOUGH_FILES) } } @@ -54,7 +56,7 @@ class EventLogFileCompactorSuite extends SparkFunSuite { (1 to 2).map(_ => testEvent): _*) val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) assertNoCompaction(fs, fileStatuses, compactor.compact(fileStatuses), - CompactionResult.NOT_ENOUGH_FILES) + CompactionResultCode.NOT_ENOUGH_FILES) } } @@ -85,7 +87,7 @@ class EventLogFileCompactorSuite extends SparkFunSuite { val newFileStatuses = Seq(fs.getFileStatus(compactedPath)) ++ fileStatuses.drop(1) val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) assertNoCompaction(fs, newFileStatuses, compactor.compact(newFileStatuses), - CompactionResult.NOT_ENOUGH_FILES) + CompactionResultCode.NOT_ENOUGH_FILES) } } @@ -104,7 +106,7 @@ class EventLogFileCompactorSuite extends SparkFunSuite { val newFileStatuses = Seq(fs.getFileStatus(compactedPath)) ++ fileStatuses.drop(1) val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) assertNoCompaction(fs, newFileStatuses, compactor.compact(newFileStatuses), - CompactionResult.NOT_ENOUGH_FILES) + CompactionResultCode.NOT_ENOUGH_FILES) } } @@ -179,16 +181,96 @@ class EventLogFileCompactorSuite extends SparkFunSuite { val compactor = new EventLogFileCompactor(newConf, hadoopConf, fs) assertNoCompaction(fs, fileStatuses, compactor.compact(fileStatuses), - CompactionResult.LOW_SCORE_FOR_COMPACTION) + CompactionResultCode.LOW_SCORE_FOR_COMPACTION) + } + } + + test("rewrite files with test filters") { + class TestEventFilter1 extends EventFilter { + override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { + case _: SparkListenerApplicationEnd => true + case _: SparkListenerBlockManagerAdded => true + case _: SparkListenerApplicationStart => false + } + + override def statistics(): Option[EventFilter.FilterStatistics] = None + } + + class TestEventFilter2 extends EventFilter { + override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { + case _: SparkListenerApplicationEnd => true + case _: SparkListenerEnvironmentUpdate => true + case _: SparkListenerNodeBlacklisted => true + case _: SparkListenerBlockManagerAdded => false + case _: SparkListenerApplicationStart => false + case _: SparkListenerNodeUnblacklisted => false + } + + override def statistics(): Option[EventFilter.FilterStatistics] = None + } + + def writeEventToWriter(writer: EventLogFileWriter, event: SparkListenerEvent): String = { + val line = EventLogTestHelper.convertEvent(event) + writer.writeEvent(line, flushLogger = true) + line + } + + withTempDir { tempDir => + val sparkConf = new SparkConf + val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) + val fs = new Path(tempDir.getAbsolutePath).getFileSystem(hadoopConf) + + val writer = new SingleEventLogFileWriter("app", None, tempDir.toURI, sparkConf, hadoopConf) + writer.start() + + val expectedLines = new mutable.ArrayBuffer[String] + + // filterApplicationEnd: Some(true) & Some(true) => filter in + expectedLines += writeEventToWriter(writer, SparkListenerApplicationEnd(0)) + + // filterBlockManagerAdded: Some(true) & Some(false) => filter out + writeEventToWriter(writer, SparkListenerBlockManagerAdded(0, BlockManagerId("1", "host1", 1), + 10)) + + // filterApplicationStart: Some(false) & Some(false) => filter out + writeEventToWriter(writer, SparkListenerApplicationStart("app", None, 0, "user", None)) + + // filterNodeBlacklisted: None & Some(true) => filter in + expectedLines += writeEventToWriter(writer, SparkListenerNodeBlacklisted(0, "host1", 1)) + + // filterNodeUnblacklisted: None & Some(false) => filter out + writeEventToWriter(writer, SparkListenerNodeUnblacklisted(0, "host1")) + + // other events: None & None => filter in + expectedLines += writeEventToWriter(writer, SparkListenerUnpersistRDD(0)) + + writer.stop() + + val filters = Seq(new TestEventFilter1, new TestEventFilter2) + + val logPath = new Path(writer.logPath) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + val newPath = compactor.rewrite(filters, Seq(fs.getFileStatus(logPath))) + assert(new Path(newPath).getName === logPath.getName + EventLogFileWriter.COMPACTED) + + Utils.tryWithResource(EventLogFileReader.openEventLog(new Path(newPath), fs)) { is => + val lines = Source.fromInputStream(is)(Codec.UTF8).getLines() + var linesLength = 0 + lines.foreach { line => + linesLength += 1 + assert(expectedLines.contains(line)) + } + assert(linesLength === expectedLines.length) + } } } private def assertCompaction( fs: FileSystem, originalFiles: Seq[FileStatus], - compactRet: (CompactionResult.Value, Option[Long]), + compactRet: CompactionResult, expectedNumOfFilesCompacted: Int): Unit = { - assert(CompactionResult.SUCCESS === compactRet._1) + assert(CompactionResultCode.SUCCESS === compactRet.code) val expectRetainedFiles = originalFiles.drop(expectedNumOfFilesCompacted) expectRetainedFiles.foreach { status => assert(fs.exists(status.getPath)) } @@ -199,7 +281,7 @@ class EventLogFileCompactorSuite extends SparkFunSuite { val expectCompactFileBasePath = originalFiles.take(expectedNumOfFilesCompacted).last.getPath val expectCompactFileIndex = RollingEventLogFilesWriter.getEventLogFileIndex( expectCompactFileBasePath.getName) - assert(Some(expectCompactFileIndex) === compactRet._2) + assert(Some(expectCompactFileIndex) === compactRet.compactIndex) val expectCompactFilePath = getCompactFilePath(expectCompactFileBasePath) assert(fs.exists(expectCompactFilePath)) @@ -213,10 +295,10 @@ class EventLogFileCompactorSuite extends SparkFunSuite { private def assertNoCompaction( fs: FileSystem, originalFiles: Seq[FileStatus], - compactRet: (CompactionResult.Value, Option[Long]), - expectedCompactRet: CompactionResult.Value): Unit = { - assert(compactRet._1 === expectedCompactRet) - assert(None === compactRet._2) + compactRet: CompactionResult, + expectedCompactRet: CompactionResultCode.Value): Unit = { + assert(expectedCompactRet === compactRet.code) + assert(None === compactRet.compactIndex) originalFiles.foreach { status => assert(fs.exists(status.getPath)) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala index 85a2acb3477f1..298fd65f293cb 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala @@ -109,27 +109,4 @@ object EventLogTestHelper { def convertEvent(event: SparkListenerEvent): String = { compact(render(JsonProtocol.sparkEventToJson(event))) } - - class TestEventFilter1 extends EventFilter { - override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { - case _: SparkListenerApplicationEnd => true - case _: SparkListenerBlockManagerAdded => true - case _: SparkListenerApplicationStart => false - } - - override def statistics(): Option[EventFilter.FilterStatistics] = None - } - - class TestEventFilter2 extends EventFilter { - override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { - case _: SparkListenerApplicationEnd => true - case _: SparkListenerEnvironmentUpdate => true - case _: SparkListenerNodeBlacklisted => true - case _: SparkListenerBlockManagerAdded => false - case _: SparkListenerApplicationStart => false - case _: SparkListenerNodeUnblacklisted => false - } - - override def statistics(): Option[EventFilter.FilterStatistics] = None - } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala deleted file mode 100644 index 96883a7647b4e..0000000000000 --- a/core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.history - -import scala.collection.mutable -import scala.io.{Codec, Source} - -import org.apache.hadoop.fs.Path - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.history.EventLogTestHelper.{TestEventFilter1, TestEventFilter2} -import org.apache.spark.scheduler._ -import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.Utils - -class FilteredEventLogFileRewriterSuite extends SparkFunSuite { - test("rewrite files with test filters") { - def writeEventToWriter(writer: EventLogFileWriter, event: SparkListenerEvent): String = { - val line = EventLogTestHelper.convertEvent(event) - writer.writeEvent(line, flushLogger = true) - line - } - - withTempDir { tempDir => - val sparkConf = new SparkConf - val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) - val fs = new Path(tempDir.getAbsolutePath).getFileSystem(hadoopConf) - - val writer = new SingleEventLogFileWriter("app", None, tempDir.toURI, sparkConf, hadoopConf) - writer.start() - - val expectedLines = new mutable.ArrayBuffer[String] - - // filterApplicationEnd: Some(true) & Some(true) => filter in - expectedLines += writeEventToWriter(writer, SparkListenerApplicationEnd(0)) - - // filterBlockManagerAdded: Some(true) & Some(false) => filter out - writeEventToWriter(writer, SparkListenerBlockManagerAdded(0, BlockManagerId("1", "host1", 1), - 10)) - - // filterApplicationStart: Some(false) & Some(false) => filter out - writeEventToWriter(writer, SparkListenerApplicationStart("app", None, 0, "user", None)) - - // filterNodeBlacklisted: None & Some(true) => filter in - expectedLines += writeEventToWriter(writer, SparkListenerNodeBlacklisted(0, "host1", 1)) - - // filterNodeUnblacklisted: None & Some(false) => filter out - writeEventToWriter(writer, SparkListenerNodeUnblacklisted(0, "host1")) - - // other events: None & None => filter in - expectedLines += writeEventToWriter(writer, SparkListenerUnpersistRDD(0)) - - writer.stop() - - val filters = Seq(new TestEventFilter1, new TestEventFilter2) - - val rewriter = new FilteredEventLogFileRewriter(sparkConf, hadoopConf, fs, filters) - val logPath = new Path(writer.logPath) - val newPath = rewriter.rewrite(Seq(fs.getFileStatus(logPath))) - assert(new Path(newPath).getName === logPath.getName + EventLogFileWriter.COMPACTED) - - Utils.tryWithResource(EventLogFileReader.openEventLog(new Path(newPath), fs)) { is => - val lines = Source.fromInputStream(is)(Codec.UTF8).getLines() - var linesLength = 0 - lines.foreach { line => - linesLength += 1 - assert(expectedLines.contains(line)) - } - assert(linesLength === expectedLines.length, "The number of lines for rewritten file " + - s"is not expected: expected ${expectedLines.length} / actual $linesLength") - } - } - } -} diff --git a/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala b/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala index 37a35744ada5e..585c8cc2ae6d4 100644 --- a/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala +++ b/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala @@ -137,53 +137,6 @@ object ListenerEventsTestHelper { SparkListenerExecutorMetricsUpdate(executorId.toString, accum, executorUpdates) } - case class JobInfo( - stageIds: Seq[Int], - stageToTaskIds: Map[Int, Seq[Long]], - stageToRddIds: Map[Int, Seq[Int]]) - - def pushJobEventsWithoutJobEnd( - listener: SparkListener, - jobId: Int, - jobProps: Properties, - execIds: Array[String], - time: Long): JobInfo = { - // Start a job with 1 stages / 4 tasks each - val rddsForStage = createRdds(2) - val stage = createStage(rddsForStage, Nil) - - listener.onJobStart(SparkListenerJobStart(jobId, time, Seq(stage), jobProps)) - - // Submit stage - stage.submissionTime = Some(time) - listener.onStageSubmitted(SparkListenerStageSubmitted(stage, jobProps)) - - // Start tasks from stage - val s1Tasks = createTasks(4, execIds, time) - s1Tasks.foreach { task => - listener.onTaskStart(SparkListenerTaskStart(stage.stageId, - stage.attemptNumber(), task)) - } - - // Succeed all tasks in stage. - val s1Metrics = TaskMetrics.empty - s1Metrics.setExecutorCpuTime(2L) - s1Metrics.setExecutorRunTime(4L) - - s1Tasks.foreach { task => - task.markFinished(TaskState.FINISHED, time) - listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, - "taskType", Success, task, new ExecutorMetrics, s1Metrics)) - } - - // End stage. - stage.completionTime = Some(time) - listener.onStageCompleted(SparkListenerStageCompleted(stage)) - - JobInfo(Seq(stage.stageId), Map(stage.stageId -> s1Tasks.map(_.taskId)), - Map(stage.stageId -> rddsForStage.map(_.id))) - } - private def nextTaskId(): Long = { taskIdTracker += 1 taskIdTracker From be77ce1343072e3d77bd3027859707c844804b9c Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 7 Jan 2020 13:29:17 +0900 Subject: [PATCH 5/7] Reflect review comments --- .../history/BasicEventFilterBuilder.scala | 37 +++++++++---------- .../BasicEventFilterBuilderSuite.scala | 5 +-- .../history/BasicEventFilterSuite.scala | 21 ++++++++--- .../history/EventLogFileCompactorSuite.scala | 21 ++++++++--- 4 files changed, 50 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala index 4294032dd3462..22979dc057d44 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala @@ -19,9 +19,11 @@ package org.apache.spark.deploy.history import scala.collection.mutable +import org.apache.spark.SparkContext import org.apache.spark.deploy.history.EventFilter.FilterStatistics import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ +import org.apache.spark.storage.BlockManagerId /** * This class tracks both live jobs and live executors, and pass the list to the @@ -46,40 +48,30 @@ private[spark] class BasicEventFilterBuilder extends SparkListener with EventFil override def onJobStart(jobStart: SparkListenerJobStart): Unit = { totalJobs += 1 - jobStart.stageIds.foreach { stageId => - if (_stageToRDDs.get(stageId).isEmpty) { - // stage submit event is not received yet - totalStages += 1 - _stageToRDDs.put(stageId, Set.empty[Int]) - } - } + totalStages += jobStart.stageIds.length _liveJobToStages += jobStart.jobId -> jobStart.stageIds.toSet } override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { val stages = _liveJobToStages.getOrElse(jobEnd.jobId, Seq.empty[Int]) _liveJobToStages -= jobEnd.jobId - // This might leave some stages and tasks if job end event comes earlier than stage submitted - // or task start event; it's not accurate but safer than dropping wrong events which cannot be - // restored. _stageToTasks --= stages _stageToRDDs --= stages } override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { val stageId = stageSubmitted.stageInfo.stageId - if (_stageToRDDs.get(stageId).isEmpty) { - // job start event is not received yet - totalStages += 1 - } _stageToRDDs.put(stageId, stageSubmitted.stageInfo.rddInfos.map(_.id).toSet) + _stageToTasks.getOrElseUpdate(stageId, new mutable.HashSet[Long]()) } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { - totalTasks += 1 - val curTasks = _stageToTasks.getOrElseUpdate(taskStart.stageId, - mutable.HashSet[Long]()) - curTasks += taskStart.taskInfo.taskId + _stageToTasks.get(taskStart.stageId) match { + case Some(tasks) => + totalTasks += 1 + tasks += taskStart.taskInfo.taskId + case _ => + } } override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { @@ -171,8 +163,13 @@ private[spark] class BasicEventFilter( case e: SparkListenerExecutorBlacklisted => liveExecutors.contains(e.executorId) case e: SparkListenerExecutorUnblacklisted => liveExecutors.contains(e.executorId) case e: SparkListenerStageExecutorMetrics => liveExecutors.contains(e.execId) - case _: SparkListenerBlockManagerAdded => true - case _: SparkListenerBlockManagerRemoved => true + case e: SparkListenerBlockManagerAdded => acceptBlockManagerEvent(e.blockManagerId) + case e: SparkListenerBlockManagerRemoved => acceptBlockManagerEvent(e.blockManagerId) + case e: SparkListenerBlockUpdated => acceptBlockManagerEvent(e.blockUpdatedInfo.blockManagerId) + } + + private def acceptBlockManagerEvent(blockManagerId: BlockManagerId): Boolean = { + blockManagerId.isDriver || liveExecutors.contains(blockManagerId.executorId) } override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala index 3fd6061bb1990..86511ae08784a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala @@ -106,7 +106,7 @@ class BasicEventFilterBuilderSuite extends SparkFunSuite { listener.onStageCompleted(SparkListenerStageCompleted(stages.head)) assert(listener.liveJobs === Set(1)) - assert(listener.liveStages === Set(0, 1)) + assert(listener.liveStages === Set(0)) // stage 1 not yet submitted - RDDs for stage 1 is not available assert(listener.liveRDDs === rddsForStage0.map(_.id).toSet) assert(listener.liveTasks === (s0Tasks ++ Seq(reattempt)).map(_.taskId).toSet) @@ -189,8 +189,7 @@ class BasicEventFilterBuilderSuite extends SparkFunSuite { // everything related to job 1 should be cleaned up, but not for job 2 assert(listener.liveJobs === Set(2)) - // there're no stage submit event on job 2, but they will be available on job start event - assert(listener.liveStages === Set(2, 3)) + assert(listener.liveStages.isEmpty) // no RDD information available as these stages are not submitted yet assert(listener.liveRDDs.isEmpty) // stageToTasks has no information for job 2, as no task has been started diff --git a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala index cd238f662c5d6..2da40dccba53e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.history -import org.apache.spark.{storage, SparkFunSuite, Success, TaskState} +import org.apache.spark.{storage, SparkContext, SparkFunSuite, Success, TaskState} import org.apache.spark.deploy.history.EventFilter.FilterStatistics import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.scheduler._ @@ -107,14 +107,16 @@ class BasicEventFilterSuite extends SparkFunSuite { assertFilterTaskEvents(acceptFn, tasksForStage2, stage2, Some(true)) } - test("accept all events for block manager addition/removal") { + test("accept all events for block manager addition/removal on driver") { val filter = new BasicEventFilter(EMPTY_STATS, Set.empty, Set.empty, Set.empty, Set.empty, Set.empty) val acceptFn = filter.acceptFn().lift - val bmId = BlockManagerId("1", "host1", 1) + val bmId = BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "host1", 1) assert(Some(true) === acceptFn(SparkListenerBlockManagerAdded(0, bmId, 1))) assert(Some(true) === acceptFn(SparkListenerBlockManagerRemoved(1, bmId))) + assert(Some(true) === acceptFn(SparkListenerBlockUpdated( + storage.BlockUpdatedInfo(bmId, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0, 10)))) } test("filter out events for dead executors") { @@ -134,6 +136,11 @@ class BasicEventFilterSuite extends SparkFunSuite { assert(Some(false) === acceptFn(SparkListenerExecutorBlacklisted(0, 1.toString, 1))) assert(Some(false) === acceptFn(SparkListenerExecutorUnblacklisted(0, 1.toString))) assert(Some(false) === acceptFn(createExecutorRemovedEvent(1))) + val bmId = BlockManagerId(1.toString, "host1", 1) + assert(Some(false) === acceptFn(SparkListenerBlockManagerAdded(0, bmId, 1))) + assert(Some(false) === acceptFn(SparkListenerBlockManagerRemoved(1, bmId))) + assert(Some(false) === acceptFn(SparkListenerBlockUpdated( + storage.BlockUpdatedInfo(bmId, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0, 10)))) // events for live executor should be accepted assert(Some(true) === acceptFn(createExecutorAddedEvent(2))) @@ -142,6 +149,11 @@ class BasicEventFilterSuite extends SparkFunSuite { assert(Some(true) === acceptFn(SparkListenerExecutorBlacklisted(0, 2.toString, 1))) assert(Some(true) === acceptFn(SparkListenerExecutorUnblacklisted(0, 2.toString))) assert(Some(true) === acceptFn(createExecutorRemovedEvent(2))) + val bmId2 = BlockManagerId(2.toString, "host1", 1) + assert(Some(true) === acceptFn(SparkListenerBlockManagerAdded(0, bmId2, 1))) + assert(Some(true) === acceptFn(SparkListenerBlockManagerRemoved(1, bmId2))) + assert(Some(true) === acceptFn(SparkListenerBlockUpdated( + storage.BlockUpdatedInfo(bmId2, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0, 10)))) } test("other events should be left to other filters") { @@ -152,9 +164,6 @@ class BasicEventFilterSuite extends SparkFunSuite { assert(None === acceptFn(SparkListenerEnvironmentUpdate(Map.empty))) assert(None === acceptFn(SparkListenerApplicationStart("1", Some("1"), 0, "user", None))) assert(None === acceptFn(SparkListenerApplicationEnd(1))) - val bmId = BlockManagerId("1", "host1", 1) - assert(None === acceptFn(SparkListenerBlockUpdated( - storage.BlockUpdatedInfo(bmId, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0, 10)))) assert(None === acceptFn(SparkListenerNodeBlacklisted(0, "host1", 1))) assert(None === acceptFn(SparkListenerNodeUnblacklisted(0, "host1"))) assert(None === acceptFn(SparkListenerLogStart("testVersion"))) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala index 4420dce527fe4..72da1da9fb600 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala @@ -168,13 +168,24 @@ class EventLogFileCompactorSuite extends SparkFunSuite { val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) val newConf = sparkConf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.7d) - // only one of two tasks is finished, which would score 0.5d - val tasks = createTasks(2, Array("exec1"), 0L).map(createTaskStartEvent(_, 1, 0)) - + // job 1 having 4 tasks + val rddsForStage1 = createRddsWithId(1 to 2) + val stage1 = createStage(1, rddsForStage1, Nil) + val tasks = createTasks(4, Array("exec1"), 0L).map(createTaskStartEvent(_, 1, 0)) + + // job 2 having 4 tasks + val rddsForStage2 = createRddsWithId(3 to 4) + val stage2 = createStage(2, rddsForStage2, Nil) + val tasks2 = createTasks(4, Array("exec1"), 0L).map(createTaskStartEvent(_, 2, 0)) + + // here job 1 is finished and job 2 is still live, hence half of total tasks are considered + // as live val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, newConf, hadoopConf, + Seq(SparkListenerJobStart(1, 0, Seq(stage1)), SparkListenerStageSubmitted(stage1)), tasks, - Seq(SparkListenerTaskEnd(1, 0, "taskType", Success, tasks.head.taskInfo, - new ExecutorMetrics, null)), + Seq(SparkListenerJobStart(2, 0, Seq(stage2)), SparkListenerStageSubmitted(stage2)), + tasks2, + Seq(SparkListenerJobEnd(1, 0, JobSucceeded)), testEvent, testEvent, testEvent) From 29ba0f2bc937d459cea434061f921050b68b8c73 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 9 Jan 2020 09:00:21 +0900 Subject: [PATCH 6/7] Reflect review comments --- .../spark/deploy/history/BasicEventFilterBuilder.scala | 8 +++----- .../org/apache/spark/deploy/history/EventFilter.scala | 2 +- .../spark/deploy/history/EventLogFileCompactor.scala | 10 ++++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala index 22979dc057d44..106da1675f71e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala @@ -66,11 +66,9 @@ private[spark] class BasicEventFilterBuilder extends SparkListener with EventFil } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { - _stageToTasks.get(taskStart.stageId) match { - case Some(tasks) => - totalTasks += 1 - tasks += taskStart.taskInfo.taskId - case _ => + totalTasks += 1 + _stageToTasks.get(taskStart.stageId).foreach { tasks => + tasks += taskStart.taskInfo.taskId } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala index edbb34bff77d8..fbb44efd7430b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala @@ -59,7 +59,7 @@ private[spark] trait EventFilter { def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] } -object EventFilter extends Logging { +private[spark] object EventFilter extends Logging { case class FilterStatistics( totalJobs: Long, liveJobs: Long, diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala index 12834fa2ee982..ef055ea288b65 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala @@ -46,7 +46,7 @@ import org.apache.spark.util.Utils * represents approximate rate of filtered-out events. Score is being calculated via applying * heuristic; task events tend to take most size in event log. */ -class EventLogFileCompactor( +private[spark] class EventLogFileCompactor( sparkConf: SparkConf, hadoopConf: Configuration, fs: FileSystem) extends Logging { @@ -134,7 +134,7 @@ class EventLogFileCompactor( } /** - * This class rewrites the event log files into one compact file: the compact file will only + * This method rewrites the event log files into one compact file: the compact file will only * contain the events which pass the filters. Events will be dropped only when all filters * decide to reject the event or don't mind about the event. Otherwise, the original line for * the event is written to the compact file as it is. @@ -200,9 +200,11 @@ class EventLogFileCompactor( * @param compactIndex The index of compact file if the compaction is successful. * Otherwise it will be None. */ -case class CompactionResult(code: CompactionResultCode.Value, compactIndex: Option[Long]) +private[spark] case class CompactionResult( + code: CompactionResultCode.Value, + compactIndex: Option[Long]) -object CompactionResultCode extends Enumeration { +private[spark] object CompactionResultCode extends Enumeration { val SUCCESS, NOT_ENOUGH_FILES, LOW_SCORE_FOR_COMPACTION = Value } From 163bda018d054a0d41e983e6c448ce3b3e746d35 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 9 Jan 2020 09:50:15 +0900 Subject: [PATCH 7/7] Fixed my bad --- .../org/apache/spark/deploy/history/EventFilter.scala | 6 +++--- .../spark/deploy/history/EventLogFileCompactor.scala | 8 +++----- .../spark/deploy/history/EventLogFileCompactorSuite.scala | 6 +++--- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala index fbb44efd7430b..a5f2394960b70 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala @@ -91,10 +91,10 @@ private[spark] object EventFilter extends Logging { event.foreach { e => val results = filters.flatMap(_.acceptFn().lift.apply(e)) - if (results.isEmpty || !results.contains(false)) { - onAccepted(line, e) - } else { + if (results.nonEmpty && results.forall(_ == false)) { onRejected(line, e) + } else { + onAccepted(line, e) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala index ef055ea288b65..80a0a7067a4e4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala @@ -46,7 +46,7 @@ import org.apache.spark.util.Utils * represents approximate rate of filtered-out events. Score is being calculated via applying * heuristic; task events tend to take most size in event log. */ -private[spark] class EventLogFileCompactor( +class EventLogFileCompactor( sparkConf: SparkConf, hadoopConf: Configuration, fs: FileSystem) extends Logging { @@ -200,11 +200,9 @@ private[spark] class EventLogFileCompactor( * @param compactIndex The index of compact file if the compaction is successful. * Otherwise it will be None. */ -private[spark] case class CompactionResult( - code: CompactionResultCode.Value, - compactIndex: Option[Long]) +case class CompactionResult(code: CompactionResultCode.Value, compactIndex: Option[Long]) -private[spark] object CompactionResultCode extends Enumeration { +object CompactionResultCode extends Enumeration { val SUCCESS, NOT_ENOUGH_FILES, LOW_SCORE_FOR_COMPACTION = Value } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala index 72da1da9fb600..866e610aab980 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala @@ -239,9 +239,9 @@ class EventLogFileCompactorSuite extends SparkFunSuite { // filterApplicationEnd: Some(true) & Some(true) => filter in expectedLines += writeEventToWriter(writer, SparkListenerApplicationEnd(0)) - // filterBlockManagerAdded: Some(true) & Some(false) => filter out - writeEventToWriter(writer, SparkListenerBlockManagerAdded(0, BlockManagerId("1", "host1", 1), - 10)) + // filterBlockManagerAdded: Some(true) & Some(false) => filter in + expectedLines += writeEventToWriter(writer, SparkListenerBlockManagerAdded( + 0, BlockManagerId("1", "host1", 1), 10)) // filterApplicationStart: Some(false) & Some(false) => filter out writeEventToWriter(writer, SparkListenerApplicationStart("app", None, 0, "user", None))