From 8db5a06d108d8a2ddb8460e48e3509f46cc4fc2f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 27 Feb 2015 15:29:26 -0800 Subject: [PATCH 1/9] Embed metadata in the event log file name instead This makes the event logs much easier to parse than before. As of this commit the whole file is either entirely compressed or not compressed, but not somewhere in between. --- .../scala/org/apache/spark/SparkContext.scala | 9 ++ .../spark/deploy/ApplicationDescription.scala | 11 +- .../spark/deploy/client/TestClient.scala | 3 +- .../deploy/history/FsHistoryProvider.scala | 6 +- .../apache/spark/deploy/master/Master.scala | 4 +- .../scheduler/EventLoggingListener.scala | 147 ++++++++---------- .../spark/scheduler/ReplayListenerBus.scala | 5 +- .../spark/scheduler/SparkListener.scala | 6 + .../spark/scheduler/SparkListenerBus.scala | 1 + .../cluster/SparkDeploySchedulerBackend.scala | 4 +- .../org/apache/spark/util/JsonProtocol.scala | 3 + 11 files changed, 103 insertions(+), 96 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3cd0c218a36fd..c60c411d9cb76 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -51,6 +51,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.executor.TriggerThreadDump import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat} +import org.apache.spark.io.CompressionCodec import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ @@ -233,6 +234,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli None } } + private[spark] val eventLogCodec: Option[String] = { + val compress = conf.getBoolean("spark.eventLog.compress", false) + if (compress && isEventLogEnabled) { + Some(CompressionCodec.createCodec(conf)).map(_.getClass.getCanonicalName) + } else { + None + } + } // Generate the random name for a temp folder in Tachyon // Add a timestamp as the suffix here to make it more safe diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index ae55b4ff40b74..4815e423eb9a5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -23,7 +23,9 @@ private[spark] class ApplicationDescription( val memoryPerSlave: Int, val command: Command, var appUiUrl: String, - val eventLogDir: Option[String] = None) + val sparkVersion: String, + val eventLogDir: Option[String] = None, + val eventLogCodec: Option[String] = None) extends Serializable { val user = System.getProperty("user.name", "") @@ -34,8 +36,11 @@ private[spark] class ApplicationDescription( memoryPerSlave: Int = memoryPerSlave, command: Command = command, appUiUrl: String = appUiUrl, - eventLogDir: Option[String] = eventLogDir): ApplicationDescription = - new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir) + sparkVersion: String = sparkVersion, + eventLogDir: Option[String] = eventLogDir, + eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription = + new ApplicationDescription( + name, maxCores, memoryPerSlave, command, appUiUrl, sparkVersion, eventLogDir, eventLogCodec) override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index 88a0862b96afe..f1a11f66c911e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -49,7 +49,8 @@ private[spark] object TestClient { val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, conf = conf, securityManager = new SecurityManager(conf)) val desc = new ApplicationDescription("TestClient", Some(1), 512, - Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored") + Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), + "ignored", "1.2.3") val listener = new TestListener val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf) client.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 3e3d6ff29faf0..ae218ac66d468 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -81,9 +81,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis = new mutable.LinkedHashMap() // Constants used to parse Spark 1.0.0 log directories. - private[history] val LOG_PREFIX = "EVENT_LOG_" - private[history] val SPARK_VERSION_PREFIX = "SPARK_VERSION_" - private[history] val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_" + private[history] val LOG_PREFIX = EventLoggingListener.EVENT_LOG_KEY + "_" + private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_" + private[history] val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_" private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" /** diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 8cc6ec1e8192c..6f39a3d8b916a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -737,13 +737,13 @@ private[spark] class Master( val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found" try { val eventLogFile = app.desc.eventLogDir - .map { dir => EventLoggingListener.getLogPath(dir, app.id) } + .map { dir => EventLoggingListener.getLogPath(dir, app.id, app.desc.eventLogCodec) } .getOrElse { // Event logging is not enabled for this application app.desc.appUiUrl = notFoundBasePath return false } - + val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf) if (fs.exists(new Path(eventLogFile + EventLoggingListener.IN_PROGRESS))) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 30075c172bdb1..69a77e8b2ecbd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -62,6 +62,13 @@ private[spark] class EventLoggingListener( private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir), hadoopConf) + private val compressionCodec = + if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf)) + } else { + None + } + private val compressionCodecName = compressionCodec.map(_.getClass.getCanonicalName) // Only defined if the file system scheme is not local private var hadoopDataStream: Option[FSDataOutputStream] = None @@ -80,7 +87,7 @@ private[spark] class EventLoggingListener( private[scheduler] val loggedEvents = new ArrayBuffer[JValue] // Visible for tests only. - private[scheduler] val logPath = getLogPath(logBaseDir, appId) + private[scheduler] val logPath = getLogPath(logBaseDir, appId, compressionCodecName) /** * Creates the log file in the configured log directory. @@ -110,17 +117,12 @@ private[spark] class EventLoggingListener( hadoopDataStream = Some(fileSystem.create(path)) hadoopDataStream.get } - - val compressionCodec = - if (shouldCompress) { - Some(CompressionCodec.createCodec(sparkConf)) - } else { - None - } + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) fileSystem.setPermission(path, LOG_FILE_PERMISSIONS) - val logStream = initEventLog(new BufferedOutputStream(dstream, outputBufferSize), - compressionCodec) + + val logStream = initEventLog(bstream, compressionCodec) writer = Some(new PrintWriter(logStream)) logInfo("Logging events to %s".format(logPath)) @@ -202,11 +204,11 @@ private[spark] object EventLoggingListener extends Logging { val IN_PROGRESS = ".inprogress" val DEFAULT_LOG_DIR = "/tmp/spark-events" - private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + val EVENT_LOG_KEY = "EVENT_LOG" + val SPARK_VERSION_KEY = "SPARK_VERSION" + val COMPRESSION_CODEC_KEY = "COMPRESSION_CODEC" - // Marker for the end of header data in a log file. After this marker, log data, potentially - // compressed, will be found. - private val HEADER_END_MARKER = "=== LOG_HEADER_END ===" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) // To avoid corrupted files causing the heap to fill up. Value is arbitrary. private val MAX_HEADER_LINE_LENGTH = 4096 @@ -217,53 +219,60 @@ private[spark] object EventLoggingListener extends Logging { /** * Write metadata about the event log to the given stream. * - * The header is a serialized version of a map, except it does not use Java serialization to - * avoid incompatibilities between different JDKs. It writes one map entry per line, in - * "key=value" format. - * - * The very last entry in the header is the `HEADER_END_MARKER` marker, so that the parsing code - * can know when to stop. + * The header is a single line of JSON in the beginning of the file. Note that this + * assumes all metadata necessary to parse the log is also included in the file name. + * The format needs to be kept in sync with the `openEventLog()` method below. Also, it + * cannot change in new Spark versions without some other way of detecting the change. * - * The format needs to be kept in sync with the openEventLog() method below. Also, it cannot - * change in new Spark versions without some other way of detecting the change (like some - * metadata encoded in the file name). - * - * @param logStream Raw output stream to the even log file. + * @param logStream Raw output stream to the event log file. * @param compressionCodec Optional compression codec to use. - * @return A stream where to write event log data. This may be a wrapper around the original + * @return A stream to which event log data is written. This may be a wrapper around the original * stream (for example, when compression is enabled). */ def initEventLog( logStream: OutputStream, compressionCodec: Option[CompressionCodec]): OutputStream = { - val meta = mutable.HashMap(("version" -> SPARK_VERSION)) + val metadata = new mutable.HashMap[String, String] + // Some of these metadata are already encoded in the file name + // Here we include them again within the file itself for completeness + metadata += ("Event" -> Utils.getFormattedClassName(SparkListenerMetadataIdentifier)) + metadata += (SPARK_VERSION_KEY -> SPARK_VERSION) compressionCodec.foreach { codec => - meta += ("compressionCodec" -> codec.getClass().getName()) + metadata += (COMPRESSION_CODEC_KEY -> codec.getClass.getCanonicalName) } - - def write(entry: String) = { - val bytes = entry.getBytes(Charsets.UTF_8) - if (bytes.length > MAX_HEADER_LINE_LENGTH) { - throw new IOException(s"Header entry too long: ${entry}") - } - logStream.write(bytes, 0, bytes.length) + val metadataJson = compact(render(JsonProtocol.mapToJson(metadata))) + val metadataBytes = (metadataJson + "\n").getBytes(Charsets.UTF_8) + if (metadataBytes.length > MAX_HEADER_LINE_LENGTH) { + throw new IOException(s"Event log metadata too long: $metadataJson") } - - meta.foreach { case (k, v) => write(s"$k=$v\n") } - write(s"$HEADER_END_MARKER\n") - compressionCodec.map(_.compressedOutputStream(logStream)).getOrElse(logStream) + logStream.write(metadataBytes, 0, metadataBytes.length) + logStream } /** * Return a file-system-safe path to the log file for the given application. * + * Note that because we currently only create a single log file for each application, + * we must encode all the information needed to parse this event log in the file name + * instead of within the file itself. Otherwise, if the file is compressed, for instance, + * we won't know which codec to use to decompress the metadata. + * * @param logBaseDir Directory where the log file will be written. * @param appId A unique app ID. + * @param compressionCodecName Name of the compression codec used to compress the contents + * of the log, or None if compression is not enabled. * @return A path which consists of file-system-safe characters. */ - def getLogPath(logBaseDir: String, appId: String): String = { - val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase - Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") + def getLogPath( + logBaseDir: String, + appId: String, + compressionCodecName: Option[String]): String = { + val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase + // e.g. EVENT_LOG_app_123_SPARK_VERSION_1.3.1 + // e.g. EVENT_LOG_ {...} _COMPRESSION_CODEC_org.apache.spark.io.LZFCompressionCodec + val logName = s"${EVENT_LOG_KEY}_${sanitizedAppId}_${SPARK_VERSION_KEY}_$SPARK_VERSION" + + compressionCodecName.map { c => s"_${COMPRESSION_CODEC_KEY}_$c" }.getOrElse("") + Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName.stripSuffix("/") } /** @@ -279,51 +288,21 @@ private[spark] object EventLoggingListener extends Logging { } val in = new BufferedInputStream(fs.open(log)) - // Read a single line from the input stream without buffering. - // We cannot use BufferedReader because we must avoid reading - // beyond the end of the header, after which the content of the - // file may be compressed. - def readLine(): String = { - val bytes = new ByteArrayOutputStream() - var next = in.read() - var count = 0 - while (next != '\n') { - if (next == -1) { - throw new IOException("Unexpected end of file.") - } - bytes.write(next) - count = count + 1 - if (count > MAX_HEADER_LINE_LENGTH) { - throw new IOException("Maximum header line length exceeded.") - } - next = in.read() - } - new String(bytes.toByteArray(), Charsets.UTF_8) + + // Parse information from the log name + val logName = log.getName + val baseRegex = s"${EVENT_LOG_KEY}_(.*)_${SPARK_VERSION_KEY}_(.*)".r + val compressionRegex = (baseRegex + s"_${COMPRESSION_CODEC_KEY}_(.*)").r + val (sparkVersion, codecName) = logName match { + case compressionRegex(_, version, _codecName) => (version, Some(_codecName)) + case baseRegex(_, version) => (version, None) + case _ => throw new IllegalArgumentException(s"Malformed event log name: $logName") + } + val codec = codecName.map { c => + codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) } - // Parse the header metadata in the form of k=v pairs - // This assumes that every line before the header end marker follows this format try { - val meta = new mutable.HashMap[String, String]() - var foundEndMarker = false - while (!foundEndMarker) { - readLine() match { - case HEADER_END_MARKER => - foundEndMarker = true - case entry => - val prop = entry.split("=", 2) - if (prop.length != 2) { - throw new IllegalArgumentException("Invalid metadata in log file.") - } - meta += (prop(0) -> prop(1)) - } - } - - val sparkVersion = meta.get("version").getOrElse( - throw new IllegalArgumentException("Missing Spark version in log metadata.")) - val codec = meta.get("compressionCodec").map { codecName => - codecMap.getOrElseUpdate(codecName, CompressionCodec.createCodec(new SparkConf, codecName)) - } (codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion) } catch { case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index d9c3a10dc5413..4702e6dede2c4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -49,7 +49,10 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { val lines = Source.fromInputStream(logData).getLines() lines.foreach { line => currentLine = line - postToAll(JsonProtocol.sparkEventFromJson(parse(line))) + JsonProtocol.sparkEventFromJson(parse(line)) match { + case SparkListenerMetadataIdentifier => // Ignore metadata for now + case event => postToAll(event) + } lineNumber += 1 } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index dd28ddb31de1f..5a0aa853e33ae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -98,6 +98,12 @@ case class SparkListenerExecutorAdded(time: Long, executorId: String, executorIn case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String) extends SparkListenerEvent +/** + * A special dummy event used to identify the metadata header in event logs. + * This is not actually posted anywhere. + */ +private[spark] case object SparkListenerMetadataIdentifier extends SparkListenerEvent + /** * Periodic updates from executors. * @param execId executor id diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index fe8a19a2c0cb9..40abb82ece8da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -58,6 +58,7 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi listener.onExecutorAdded(executorAdded) case executorRemoved: SparkListenerExecutorRemoved => listener.onExecutorRemoved(executorRemoved) + case SparkListenerMetadataIdentifier => } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index a0aa555f6244f..d3307312234ad 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster import java.util.concurrent.Semaphore -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, SPARK_VERSION} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{AppClient, AppClientListener} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl} @@ -85,7 +85,7 @@ private[spark] class SparkDeploySchedulerBackend( args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - appUIAddress, sc.eventLogDir) + appUIAddress, SPARK_VERSION, sc.eventLogDir, sc.eventLogCodec) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 8e20864db5673..d444f72baf5a5 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -91,6 +91,7 @@ private[spark] object JsonProtocol { executorRemovedToJson(executorRemoved) // These aren't used, but keeps compiler happy case SparkListenerExecutorMetricsUpdate(_, _) => JNothing + case SparkListenerMetadataIdentifier => JNothing } } @@ -447,6 +448,7 @@ private[spark] object JsonProtocol { val applicationEnd = Utils.getFormattedClassName(SparkListenerApplicationEnd) val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded) val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved) + val metadataIdentifier = Utils.getFormattedClassName(SparkListenerMetadataIdentifier) (json \ "Event").extract[String] match { case `stageSubmitted` => stageSubmittedFromJson(json) @@ -464,6 +466,7 @@ private[spark] object JsonProtocol { case `applicationEnd` => applicationEndFromJson(json) case `executorAdded` => executorAddedFromJson(json) case `executorRemoved` => executorRemovedFromJson(json) + case `metadataIdentifier` => SparkListenerMetadataIdentifier } } From f32d8d25bb5460c2b9f8d2d812665572c8bcbdd4 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 27 Feb 2015 17:08:26 -0800 Subject: [PATCH 2/9] Fix tests --- .../scheduler/EventLoggingListener.scala | 7 +++-- .../spark/deploy/JsonProtocolSuite.scala | 2 +- .../history/FsHistoryProviderSuite.scala | 25 ++++++++++------ .../deploy/worker/ExecutorRunnerTest.scala | 2 +- .../scheduler/EventLoggingListenerSuite.scala | 30 +++++++++---------- 5 files changed, 37 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 69a77e8b2ecbd..79e0b227f907d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -266,7 +266,7 @@ private[spark] object EventLoggingListener extends Logging { def getLogPath( logBaseDir: String, appId: String, - compressionCodecName: Option[String]): String = { + compressionCodecName: Option[String] = None): String = { val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase // e.g. EVENT_LOG_app_123_SPARK_VERSION_1.3.1 // e.g. EVENT_LOG_ {...} _COMPRESSION_CODEC_org.apache.spark.io.LZFCompressionCodec @@ -276,7 +276,10 @@ private[spark] object EventLoggingListener extends Logging { } /** - * Opens an event log file and returns an input stream to the event data. + * Opens an event log file and returns an input stream that contains the event data. + * + * The first line of the returned input stream is a JSON header that describes the metadata + * of the event log. * * @return 2-tuple (event input stream, Spark version of event data) */ diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index e955636cf5b59..bd566dfb3209e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -90,7 +90,7 @@ class JsonProtocolSuite extends FunSuite { def createAppDesc(): ApplicationDescription = { val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq()) - new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl") + new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl", "1.2.3") } def createAppInfo() : ApplicationInfo = { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 85939eaadccc7..5f6101975b4ec 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -18,17 +18,16 @@ package org.apache.spark.deploy.history import java.io.{File, FileOutputStream, OutputStreamWriter} +import java.net.URI import scala.io.Source -import com.google.common.io.Files import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.Matchers import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io._ import org.apache.spark.scheduler._ import org.apache.spark.util.{JsonProtocol, Utils} @@ -45,18 +44,26 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers Utils.deleteRecursively(testDir) } + /** Create a fake log file using the new log format used in Spark 1.3+ */ + private def newLogFile(appId: String, inProgress: Boolean = false): File = { + val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else "" + val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, appId) + val logPath = new URI(logUri).getPath + ip + new File(logPath) + } + test("Parse new and old application logs") { val provider = new FsHistoryProvider(createTestConf()) // Write a new-style application log. - val newAppComplete = new File(testDir, "new1") + val newAppComplete = newLogFile("new1", inProgress = false) writeFile(newAppComplete, true, None, SparkListenerApplicationStart("new-app-complete", None, 1L, "test"), SparkListenerApplicationEnd(4L) ) // Write an unfinished app, new-style. - val newAppIncomplete = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS) + val newAppIncomplete = newLogFile("new2", inProgress = true) writeFile(newAppIncomplete, true, None, SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test") ) @@ -141,12 +148,12 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers } test("SPARK-3697: ignore directories that cannot be read.") { - val logFile1 = new File(testDir, "new1") + val logFile1 = newLogFile("new1") writeFile(logFile1, true, None, SparkListenerApplicationStart("app1-1", None, 1L, "test"), SparkListenerApplicationEnd(2L) ) - val logFile2 = new File(testDir, "new2") + val logFile2 = newLogFile("new2") writeFile(logFile2, true, None, SparkListenerApplicationStart("app1-2", None, 1L, "test"), SparkListenerApplicationEnd(2L) @@ -164,7 +171,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers test("history file is renamed from inprogress to completed") { val provider = new FsHistoryProvider(createTestConf()) - val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS) + val logFile1 = newLogFile("app1", inProgress = true) writeFile(logFile1, true, None, SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"), SparkListenerApplicationEnd(2L) @@ -174,7 +181,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers appListBeforeRename.size should be (1) appListBeforeRename.head.logPath should endWith(EventLoggingListener.IN_PROGRESS) - logFile1.renameTo(new File(testDir, "app1")) + logFile1.renameTo(newLogFile("app1", inProgress = false)) provider.checkForLogs() val appListAfterRename = provider.getListing() appListAfterRename.size should be (1) @@ -184,7 +191,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers test("SPARK-5582: empty log directory") { val provider = new FsHistoryProvider(createTestConf()) - val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS) + val logFile1 = newLogFile("app1", inProgress = true) writeFile(logFile1, true, None, SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"), SparkListenerApplicationEnd(2L)) diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 76511699e5ac5..8a6cd8f70574b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -31,7 +31,7 @@ class ExecutorRunnerTest extends FunSuite { val appId = "12345-worker321-9876" val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) val appDesc = new ApplicationDescription("app name", Some(8), 500, - Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl") + Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl", "1.2.3") val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123, new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"), ExecutorState.RUNNING) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 437d8693c0b1f..4ab8860885279 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.io.{File, FileOutputStream, InputStream, IOException} +import java.net.URI import scala.collection.mutable import scala.io.Source @@ -93,16 +94,12 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin } test("Log overwriting") { - val log = new FileOutputStream(new File(testDir, "test")) - log.close() - try { - testEventLogging() - assert(false) - } catch { - case e: IOException => - // Expected, since we haven't enabled log overwrite. - } - + val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, "test") + val logPath = new URI(logUri).getPath + // Create file before writing the event log + new FileOutputStream(new File(logPath)).close() + // Expected IOException, since we haven't enabled log overwrite. + intercept[IOException] { testEventLogging() } // Try again, but enable overwriting. testEventLogging(extraConf = Map("spark.eventLog.overwrite" -> "true")) } @@ -144,11 +141,12 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin fileSystem) try { val lines = readLines(logData) - assert(lines.size === 2) - assert(lines(0).contains("SparkListenerApplicationStart")) - assert(lines(1).contains("SparkListenerApplicationEnd")) - assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === applicationStart) - assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationEnd) + assert(lines.size === 3) // including the header + assert(lines(0).contains("SparkListenerMetadataIdentifier")) + assert(lines(1).contains("SparkListenerApplicationStart")) + assert(lines(2).contains("SparkListenerApplicationEnd")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationStart) + assert(JsonProtocol.sparkEventFromJson(parse(lines(2))) === applicationEnd) } finally { logData.close() } @@ -164,7 +162,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin assert(sc.eventLogger.isDefined) val eventLogger = sc.eventLogger.get val expectedLogDir = testDir.toURI().toString() - assert(eventLogger.logPath.startsWith(expectedLogDir + "/")) + assert(eventLogger.logPath.startsWith(expectedLogDir)) // Begin listening for events that trigger asserts val eventExistenceListener = new EventExistenceListener(eventLogger) From 88a091d55fbc06909b0c41f44bd41833a8238e89 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 27 Feb 2015 17:32:58 -0800 Subject: [PATCH 3/9] Add tests for new format and file name --- .../scheduler/EventLoggingListenerSuite.scala | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 4ab8860885279..d8b6ad4ec6837 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ import org.scalatest.{BeforeAndAfter, FunSuite} -import org.apache.spark.{Logging, SparkConf, SparkContext} +import org.apache.spark.{Logging, SparkConf, SparkContext, SPARK_VERSION} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io._ import org.apache.spark.util.{JsonProtocol, Utils} @@ -104,6 +104,18 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin testEventLogging(extraConf = Map("spark.eventLog.overwrite" -> "true")) } + test("Event log name") { + // without compression + assert(s"file:/base-dir/EVENT_LOG_app1_SPARK_VERSION_$SPARK_VERSION" === + EventLoggingListener.getLogPath("/base-dir", "app1")) + // with compression + assert(s"file:/base-dir/EVENT_LOG_app1_SPARK_VERSION_${SPARK_VERSION}_COMPRESSION_CODEC_lzf" === + EventLoggingListener.getLogPath("/base-dir", "app1", Some("lzf"))) + // illegal characters in app ID + assert(s"file:/base-dir/EVENT_LOG_a-fine-mind_dollar_bills_1_SPARK_VERSION_$SPARK_VERSION" === + EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}1")) + } + /* ----------------- * * Actual test logic * * ----------------- */ @@ -145,6 +157,8 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin assert(lines(0).contains("SparkListenerMetadataIdentifier")) assert(lines(1).contains("SparkListenerApplicationStart")) assert(lines(2).contains("SparkListenerApplicationEnd")) + assertMetadataValid(lines(0), compressionCodec) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === SparkListenerMetadataIdentifier) assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationStart) assert(JsonProtocol.sparkEventFromJson(parse(lines(2))) === applicationEnd) } finally { @@ -152,6 +166,17 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin } } + /** + * Assert that the line is a correct JSON representation of the event log metadata. + */ + private def assertMetadataValid(line: String, compressionCodec: Option[String] = None): Unit = { + val metadata = JsonProtocol.mapFromJson(parse(line)) + assert(metadata.size === 2 + compressionCodec.size) + assert(metadata.get("Event") === Some(SparkListenerMetadataIdentifier.toString)) + assert(metadata.get(EventLoggingListener.SPARK_VERSION_KEY) === Some(SPARK_VERSION)) + assert(metadata.get(EventLoggingListener.COMPRESSION_CODEC_KEY) === compressionCodec) + } + /** * Test end-to-end event logging functionality in an application. * This runs a simple Spark job and asserts that the expected events are logged when expected. @@ -161,8 +186,10 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin val sc = new SparkContext("local-cluster[2,2,512]", "test", conf) assert(sc.eventLogger.isDefined) val eventLogger = sc.eventLogger.get + val eventLogPath = eventLogger.logPath val expectedLogDir = testDir.toURI().toString() - assert(eventLogger.logPath.startsWith(expectedLogDir)) + assert(eventLogPath === EventLoggingListener.getLogPath( + expectedLogDir, sc.applicationId, compressionCodec)) // Begin listening for events that trigger asserts val eventExistenceListener = new EventExistenceListener(eventLogger) @@ -191,6 +218,8 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin SparkListenerTaskStart, SparkListenerTaskEnd, SparkListenerApplicationEnd).map(Utils.getFormattedClassName) + // Verify that the first line is valid metadata + assertMetadataValid(lines(0), compressionCodec) lines.foreach { line => eventSet.foreach { event => if (line.contains(event)) { From 519e51a958b40d193327e85b659e1df767041f55 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 27 Feb 2015 17:53:37 -0800 Subject: [PATCH 4/9] Address review feedback --- .../deploy/history/FsHistoryProvider.scala | 2 +- .../scheduler/EventLoggingListener.scala | 26 +++++++++++-------- .../scheduler/EventLoggingListenerSuite.scala | 6 ++--- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index ae218ac66d468..621429d62b49f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -81,7 +81,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis = new mutable.LinkedHashMap() // Constants used to parse Spark 1.0.0 log directories. - private[history] val LOG_PREFIX = EventLoggingListener.EVENT_LOG_KEY + "_" + private[history] val LOG_PREFIX = "EVENT_LOG_" private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_" private[history] val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_" private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 79e0b227f907d..a279ec78486e3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -117,15 +117,21 @@ private[spark] class EventLoggingListener( hadoopDataStream = Some(fileSystem.create(path)) hadoopDataStream.get } - val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) - val bstream = new BufferedOutputStream(cstream, outputBufferSize) - fileSystem.setPermission(path, LOG_FILE_PERMISSIONS) + try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) - val logStream = initEventLog(bstream, compressionCodec) - writer = Some(new PrintWriter(logStream)) + fileSystem.setPermission(path, LOG_FILE_PERMISSIONS) - logInfo("Logging events to %s".format(logPath)) + val logStream = initEventLog(bstream, compressionCodec) + writer = Some(new PrintWriter(logStream)) + logInfo("Logging events to %s".format(logPath)) + } catch { + case e: Exception => + dstream.close() + throw e + } } /** Log the event as JSON. */ @@ -203,8 +209,6 @@ private[spark] object EventLoggingListener extends Logging { // Suffix applied to the names of files still being written by applications. val IN_PROGRESS = ".inprogress" val DEFAULT_LOG_DIR = "/tmp/spark-events" - - val EVENT_LOG_KEY = "EVENT_LOG" val SPARK_VERSION_KEY = "SPARK_VERSION" val COMPRESSION_CODEC_KEY = "COMPRESSION_CODEC" @@ -270,9 +274,9 @@ private[spark] object EventLoggingListener extends Logging { val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase // e.g. EVENT_LOG_app_123_SPARK_VERSION_1.3.1 // e.g. EVENT_LOG_ {...} _COMPRESSION_CODEC_org.apache.spark.io.LZFCompressionCodec - val logName = s"${EVENT_LOG_KEY}_${sanitizedAppId}_${SPARK_VERSION_KEY}_$SPARK_VERSION" + + val logName = s"${sanitizedAppId}_${SPARK_VERSION_KEY}_$SPARK_VERSION" + compressionCodecName.map { c => s"_${COMPRESSION_CODEC_KEY}_$c" }.getOrElse("") - Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName.stripSuffix("/") + Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName } /** @@ -294,7 +298,7 @@ private[spark] object EventLoggingListener extends Logging { // Parse information from the log name val logName = log.getName - val baseRegex = s"${EVENT_LOG_KEY}_(.*)_${SPARK_VERSION_KEY}_(.*)".r + val baseRegex = s"(.*)_${SPARK_VERSION_KEY}_(.*)".r val compressionRegex = (baseRegex + s"_${COMPRESSION_CODEC_KEY}_(.*)").r val (sparkVersion, codecName) = logName match { case compressionRegex(_, version, _codecName) => (version, Some(_codecName)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index d8b6ad4ec6837..1bea5f1de07f4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -106,13 +106,13 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin test("Event log name") { // without compression - assert(s"file:/base-dir/EVENT_LOG_app1_SPARK_VERSION_$SPARK_VERSION" === + assert(s"file:/base-dir/app1_SPARK_VERSION_$SPARK_VERSION" === EventLoggingListener.getLogPath("/base-dir", "app1")) // with compression - assert(s"file:/base-dir/EVENT_LOG_app1_SPARK_VERSION_${SPARK_VERSION}_COMPRESSION_CODEC_lzf" === + assert(s"file:/base-dir/app1_SPARK_VERSION_${SPARK_VERSION}_COMPRESSION_CODEC_lzf" === EventLoggingListener.getLogPath("/base-dir", "app1", Some("lzf"))) // illegal characters in app ID - assert(s"file:/base-dir/EVENT_LOG_a-fine-mind_dollar_bills_1_SPARK_VERSION_$SPARK_VERSION" === + assert(s"file:/base-dir/a-fine-mind_dollar_bills_1_SPARK_VERSION_$SPARK_VERSION" === EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}1")) } From 27c9a6c1c5546810307bec0b2bffde6f7bd4c1a0 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sun, 1 Mar 2015 21:55:31 -0800 Subject: [PATCH 5/9] Address review feedback Things changed in this commit: (1) No more metadata in log content (2) No more Spark version in log file name (3) Use short name for compression codec in log file name --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../spark/deploy/ApplicationDescription.scala | 5 +- .../spark/deploy/client/TestClient.scala | 3 +- .../deploy/history/FsHistoryProvider.scala | 16 ++-- .../apache/spark/deploy/master/Master.scala | 4 +- .../apache/spark/io/CompressionCodec.scala | 22 ++++- .../scheduler/EventLoggingListener.scala | 82 +++++-------------- .../spark/scheduler/ReplayListenerBus.scala | 8 +- .../spark/scheduler/SparkListener.scala | 6 -- .../spark/scheduler/SparkListenerBus.scala | 1 - .../cluster/SparkDeploySchedulerBackend.scala | 2 +- .../org/apache/spark/util/JsonProtocol.scala | 3 - .../spark/deploy/JsonProtocolSuite.scala | 2 +- .../history/FsHistoryProviderSuite.scala | 11 +-- .../deploy/worker/ExecutorRunnerTest.scala | 2 +- .../scheduler/EventLoggingListenerSuite.scala | 43 +++------- .../spark/scheduler/ReplayListenerSuite.scala | 13 +-- 17 files changed, 74 insertions(+), 151 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c60c411d9cb76..e231e8369dbac 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -237,7 +237,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] val eventLogCodec: Option[String] = { val compress = conf.getBoolean("spark.eventLog.compress", false) if (compress && isEventLogEnabled) { - Some(CompressionCodec.createCodec(conf)).map(_.getClass.getCanonicalName) + Some(CompressionCodec.getCodecName(conf)).map(CompressionCodec.getShortName) } else { None } diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 4815e423eb9a5..3d0d68de8f495 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -23,8 +23,8 @@ private[spark] class ApplicationDescription( val memoryPerSlave: Int, val command: Command, var appUiUrl: String, - val sparkVersion: String, val eventLogDir: Option[String] = None, + // short name of compression codec used when writing event logs, if any (e.g. lzf) val eventLogCodec: Option[String] = None) extends Serializable { @@ -36,11 +36,10 @@ private[spark] class ApplicationDescription( memoryPerSlave: Int = memoryPerSlave, command: Command = command, appUiUrl: String = appUiUrl, - sparkVersion: String = sparkVersion, eventLogDir: Option[String] = eventLogDir, eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription = new ApplicationDescription( - name, maxCores, memoryPerSlave, command, appUiUrl, sparkVersion, eventLogDir, eventLogCodec) + name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec) override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index f1a11f66c911e..88a0862b96afe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -49,8 +49,7 @@ private[spark] object TestClient { val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, conf = conf, securityManager = new SecurityManager(conf)) val desc = new ApplicationDescription("TestClient", Some(1), 512, - Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), - "ignored", "1.2.3") + Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored") val listener = new TestListener val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf) client.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 621429d62b49f..86ba59f5c48fe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -291,7 +291,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = { val logPath = eventLog.getPath() logInfo(s"Replaying log path: $logPath") - val (logInput, sparkVersion) = + val logInput = if (isLegacyLogDirectory(eventLog)) { openLegacyEventLog(logPath) } else { @@ -300,7 +300,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis try { val appListener = new ApplicationEventListener bus.addListener(appListener) - bus.replay(logInput, sparkVersion, logPath.toString) + bus.replay(logInput, logPath.toString) new FsApplicationHistoryInfo( logPath.getName(), appListener.appId.getOrElse(logPath.getName()), @@ -322,28 +322,22 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis * * @return 2-tuple of (input stream of the events, version of Spark which wrote the log) */ - private[history] def openLegacyEventLog(dir: Path): (InputStream, String) = { + private[history] def openLegacyEventLog(dir: Path): InputStream = { val children = fs.listStatus(dir) var eventLogPath: Path = null var codecName: Option[String] = None - var sparkVersion: String = null children.foreach { child => child.getPath().getName() match { case name if name.startsWith(LOG_PREFIX) => eventLogPath = child.getPath() - case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) => codecName = Some(codec.substring(COMPRESSION_CODEC_PREFIX.length())) - - case version if version.startsWith(SPARK_VERSION_PREFIX) => - sparkVersion = version.substring(SPARK_VERSION_PREFIX.length()) - case _ => } } - if (eventLogPath == null || sparkVersion == null) { + if (eventLogPath == null) { throw new IllegalArgumentException(s"$dir is not a Spark application log directory.") } @@ -355,7 +349,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } val in = new BufferedInputStream(fs.open(eventLogPath)) - (codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion) + codec.map(_.compressedInputStream(in)).getOrElse(in) } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 6f39a3d8b916a..148485cc11863 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -756,12 +756,12 @@ private[spark] class Master( return false } - val (logInput, sparkVersion) = EventLoggingListener.openEventLog(new Path(eventLogFile), fs) + val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs) val replayBus = new ReplayListenerBus() val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf), appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}") try { - replayBus.replay(logInput, sparkVersion, eventLogFile) + replayBus.replay(logInput, eventLogFile) } finally { logInput.close() } diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index f856890d279f4..187d1cf5ad4a9 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -26,7 +26,6 @@ import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils -import org.apache.spark.Logging /** * :: DeveloperApi :: @@ -53,8 +52,12 @@ private[spark] object CompressionCodec { "lzf" -> classOf[LZFCompressionCodec].getName, "snappy" -> classOf[SnappyCompressionCodec].getName) + def getCodecName(conf: SparkConf): String = { + conf.get(configKey, DEFAULT_COMPRESSION_CODEC) + } + def createCodec(conf: SparkConf): CompressionCodec = { - createCodec(conf, conf.get(configKey, DEFAULT_COMPRESSION_CODEC)) + createCodec(conf, getCodecName(conf)) } def createCodec(conf: SparkConf, codecName: String): CompressionCodec = { @@ -71,6 +74,21 @@ private[spark] object CompressionCodec { s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC")) } + /** + * Return the short version of the given codec name. + * If it is already a short name, just return it. + */ + def getShortName(codecName: String): String = { + if (shortCompressionCodecNames.contains(codecName)) { + codecName + } else { + shortCompressionCodecNames + .collect { case (k, v) if v == codecName => k } + .headOption + .getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") } + } + } + val FALLBACK_COMPRESSION_CODEC = "lzf" val DEFAULT_COMPRESSION_CODEC = "snappy" val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index a279ec78486e3..1157560e8aef5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -23,14 +23,13 @@ import java.net.URI import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import com.google.common.base.Charsets import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.hadoop.fs.permission.FsPermission import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ -import org.apache.spark.{Logging, SparkConf, SPARK_VERSION} +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} @@ -68,7 +67,9 @@ private[spark] class EventLoggingListener( } else { None } - private val compressionCodecName = compressionCodec.map(_.getClass.getCanonicalName) + private val compressionCodecName = compressionCodec.map { c => + CompressionCodec.getShortName(c.getClass.getName) + } // Only defined if the file system scheme is not local private var hadoopDataStream: Option[FSDataOutputStream] = None @@ -121,11 +122,8 @@ private[spark] class EventLoggingListener( try { val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) val bstream = new BufferedOutputStream(cstream, outputBufferSize) - fileSystem.setPermission(path, LOG_FILE_PERMISSIONS) - - val logStream = initEventLog(bstream, compressionCodec) - writer = Some(new PrintWriter(logStream)) + writer = Some(new PrintWriter(bstream)) logInfo("Logging events to %s".format(logPath)) } catch { case e: Exception => @@ -214,56 +212,21 @@ private[spark] object EventLoggingListener extends Logging { private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) - // To avoid corrupted files causing the heap to fill up. Value is arbitrary. - private val MAX_HEADER_LINE_LENGTH = 4096 - // A cache for compression codecs to avoid creating the same codec many times private val codecMap = new mutable.HashMap[String, CompressionCodec] - /** - * Write metadata about the event log to the given stream. - * - * The header is a single line of JSON in the beginning of the file. Note that this - * assumes all metadata necessary to parse the log is also included in the file name. - * The format needs to be kept in sync with the `openEventLog()` method below. Also, it - * cannot change in new Spark versions without some other way of detecting the change. - * - * @param logStream Raw output stream to the event log file. - * @param compressionCodec Optional compression codec to use. - * @return A stream to which event log data is written. This may be a wrapper around the original - * stream (for example, when compression is enabled). - */ - def initEventLog( - logStream: OutputStream, - compressionCodec: Option[CompressionCodec]): OutputStream = { - val metadata = new mutable.HashMap[String, String] - // Some of these metadata are already encoded in the file name - // Here we include them again within the file itself for completeness - metadata += ("Event" -> Utils.getFormattedClassName(SparkListenerMetadataIdentifier)) - metadata += (SPARK_VERSION_KEY -> SPARK_VERSION) - compressionCodec.foreach { codec => - metadata += (COMPRESSION_CODEC_KEY -> codec.getClass.getCanonicalName) - } - val metadataJson = compact(render(JsonProtocol.mapToJson(metadata))) - val metadataBytes = (metadataJson + "\n").getBytes(Charsets.UTF_8) - if (metadataBytes.length > MAX_HEADER_LINE_LENGTH) { - throw new IOException(s"Event log metadata too long: $metadataJson") - } - logStream.write(metadataBytes, 0, metadataBytes.length) - logStream - } - /** * Return a file-system-safe path to the log file for the given application. * * Note that because we currently only create a single log file for each application, * we must encode all the information needed to parse this event log in the file name * instead of within the file itself. Otherwise, if the file is compressed, for instance, - * we won't know which codec to use to decompress the metadata. + * we won't know which codec to use to decompress the metadata needed to open the file in + * the first place. * * @param logBaseDir Directory where the log file will be written. * @param appId A unique app ID. - * @param compressionCodecName Name of the compression codec used to compress the contents + * @param compressionCodecName Name to identify the codec used to compress the contents * of the log, or None if compression is not enabled. * @return A path which consists of file-system-safe characters. */ @@ -272,22 +235,19 @@ private[spark] object EventLoggingListener extends Logging { appId: String, compressionCodecName: Option[String] = None): String = { val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase - // e.g. EVENT_LOG_app_123_SPARK_VERSION_1.3.1 - // e.g. EVENT_LOG_ {...} _COMPRESSION_CODEC_org.apache.spark.io.LZFCompressionCodec - val logName = s"${sanitizedAppId}_${SPARK_VERSION_KEY}_$SPARK_VERSION" + - compressionCodecName.map { c => s"_${COMPRESSION_CODEC_KEY}_$c" }.getOrElse("") + // e.g. app_123, app_123_COMPRESSION_CODEC_lzf + val logName = sanitizedAppId + compressionCodecName + .map { c => s"_${COMPRESSION_CODEC_KEY}_$c" } + .getOrElse("") Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName } /** * Opens an event log file and returns an input stream that contains the event data. * - * The first line of the returned input stream is a JSON header that describes the metadata - * of the event log. - * - * @return 2-tuple (event input stream, Spark version of event data) + * @return input stream that holds one JSON serialized event per line */ - def openEventLog(log: Path, fs: FileSystem): (InputStream, String) = { + def openEventLog(log: Path, fs: FileSystem): InputStream = { // It's not clear whether FileSystem.open() throws FileNotFoundException or just plain // IOException when a file does not exist, so try our best to throw a proper exception. if (!fs.exists(log)) { @@ -296,21 +256,19 @@ private[spark] object EventLoggingListener extends Logging { val in = new BufferedInputStream(fs.open(log)) - // Parse information from the log name + // Parse compression codec from the log name val logName = log.getName - val baseRegex = s"(.*)_${SPARK_VERSION_KEY}_(.*)".r - val compressionRegex = (baseRegex + s"_${COMPRESSION_CODEC_KEY}_(.*)").r - val (sparkVersion, codecName) = logName match { - case compressionRegex(_, version, _codecName) => (version, Some(_codecName)) - case baseRegex(_, version) => (version, None) - case _ => throw new IllegalArgumentException(s"Malformed event log name: $logName") + val compressionRegex = s".*_${COMPRESSION_CODEC_KEY}_(.*)".r + val codecName: Option[String] = logName match { + case compressionRegex(_codecName) => Some(_codecName) + case _ => None } val codec = codecName.map { c => codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) } try { - (codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion) + codec.map(_.compressedInputStream(in)).getOrElse(in) } catch { case e: Exception => in.close() diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 4702e6dede2c4..95273c716b3e2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -39,20 +39,16 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { * error is thrown by this method. * * @param logData Stream containing event log data. - * @param version Spark version that generated the events. * @param sourceName Filename (or other source identifier) from whence @logData is being read */ - def replay(logData: InputStream, version: String, sourceName: String) { + def replay(logData: InputStream, sourceName: String): Unit = { var currentLine: String = null var lineNumber: Int = 1 try { val lines = Source.fromInputStream(logData).getLines() lines.foreach { line => currentLine = line - JsonProtocol.sparkEventFromJson(parse(line)) match { - case SparkListenerMetadataIdentifier => // Ignore metadata for now - case event => postToAll(event) - } + postToAll(JsonProtocol.sparkEventFromJson(parse(line))) lineNumber += 1 } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 5a0aa853e33ae..dd28ddb31de1f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -98,12 +98,6 @@ case class SparkListenerExecutorAdded(time: Long, executorId: String, executorIn case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String) extends SparkListenerEvent -/** - * A special dummy event used to identify the metadata header in event logs. - * This is not actually posted anywhere. - */ -private[spark] case object SparkListenerMetadataIdentifier extends SparkListenerEvent - /** * Periodic updates from executors. * @param execId executor id diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 40abb82ece8da..fe8a19a2c0cb9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -58,7 +58,6 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi listener.onExecutorAdded(executorAdded) case executorRemoved: SparkListenerExecutorRemoved => listener.onExecutorRemoved(executorRemoved) - case SparkListenerMetadataIdentifier => } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index d3307312234ad..84fc640e4c098 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -85,7 +85,7 @@ private[spark] class SparkDeploySchedulerBackend( args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - appUIAddress, SPARK_VERSION, sc.eventLogDir, sc.eventLogCodec) + appUIAddress, sc.eventLogDir, sc.eventLogCodec) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index d444f72baf5a5..8e20864db5673 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -91,7 +91,6 @@ private[spark] object JsonProtocol { executorRemovedToJson(executorRemoved) // These aren't used, but keeps compiler happy case SparkListenerExecutorMetricsUpdate(_, _) => JNothing - case SparkListenerMetadataIdentifier => JNothing } } @@ -448,7 +447,6 @@ private[spark] object JsonProtocol { val applicationEnd = Utils.getFormattedClassName(SparkListenerApplicationEnd) val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded) val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved) - val metadataIdentifier = Utils.getFormattedClassName(SparkListenerMetadataIdentifier) (json \ "Event").extract[String] match { case `stageSubmitted` => stageSubmittedFromJson(json) @@ -466,7 +464,6 @@ private[spark] object JsonProtocol { case `applicationEnd` => applicationEndFromJson(json) case `executorAdded` => executorAddedFromJson(json) case `executorRemoved` => executorRemovedFromJson(json) - case `metadataIdentifier` => SparkListenerMetadataIdentifier } } diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index bd566dfb3209e..e955636cf5b59 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -90,7 +90,7 @@ class JsonProtocolSuite extends FunSuite { def createAppDesc(): ApplicationDescription = { val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq()) - new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl", "1.2.3") + new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl") } def createAppInfo() : ApplicationInfo = { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 5f6101975b4ec..a595f50af21f7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -134,7 +134,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers val logPath = new Path(logDir.getAbsolutePath()) try { - val (logInput, sparkVersion) = provider.openLegacyEventLog(logPath) + val logInput = provider.openLegacyEventLog(logPath) try { Source.fromInputStream(logInput).getLines().toSeq.size should be (2) } finally { @@ -206,13 +206,8 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec], events: SparkListenerEvent*) = { - val out = - if (isNewFormat) { - EventLoggingListener.initEventLog(new FileOutputStream(file), codec) - } else { - val fileStream = new FileOutputStream(file) - codec.map(_.compressedOutputStream(fileStream)).getOrElse(fileStream) - } + val fileStream = new FileOutputStream(file) + val out = codec.map(_.compressedOutputStream(fileStream)).getOrElse(fileStream) val writer = new OutputStreamWriter(out, "UTF-8") try { events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n")) diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 8a6cd8f70574b..76511699e5ac5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -31,7 +31,7 @@ class ExecutorRunnerTest extends FunSuite { val appId = "12345-worker321-9876" val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) val appDesc = new ApplicationDescription("app name", Some(8), 500, - Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl", "1.2.3") + Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl") val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123, new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"), ExecutorState.RUNNING) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 1bea5f1de07f4..595a1c66d7fc9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ import org.scalatest.{BeforeAndAfter, FunSuite} -import org.apache.spark.{Logging, SparkConf, SparkContext, SPARK_VERSION} +import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io._ import org.apache.spark.util.{JsonProtocol, Utils} @@ -106,13 +106,12 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin test("Event log name") { // without compression - assert(s"file:/base-dir/app1_SPARK_VERSION_$SPARK_VERSION" === - EventLoggingListener.getLogPath("/base-dir", "app1")) + assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath("/base-dir", "app1")) // with compression - assert(s"file:/base-dir/app1_SPARK_VERSION_${SPARK_VERSION}_COMPRESSION_CODEC_lzf" === + assert(s"file:/base-dir/app1_COMPRESSION_CODEC_lzf" === EventLoggingListener.getLogPath("/base-dir", "app1", Some("lzf"))) // illegal characters in app ID - assert(s"file:/base-dir/a-fine-mind_dollar_bills_1_SPARK_VERSION_$SPARK_VERSION" === + assert(s"file:/base-dir/a-fine-mind_dollar_bills_1" === EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}1")) } @@ -149,34 +148,19 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin eventLogger.stop() // Verify file contains exactly the two events logged - val (logData, version) = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), - fileSystem) + val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) try { val lines = readLines(logData) - assert(lines.size === 3) // including the header - assert(lines(0).contains("SparkListenerMetadataIdentifier")) - assert(lines(1).contains("SparkListenerApplicationStart")) - assert(lines(2).contains("SparkListenerApplicationEnd")) - assertMetadataValid(lines(0), compressionCodec) - assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === SparkListenerMetadataIdentifier) - assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationStart) - assert(JsonProtocol.sparkEventFromJson(parse(lines(2))) === applicationEnd) + assert(lines.size === 2) + assert(lines(0).contains("SparkListenerApplicationStart")) + assert(lines(1).contains("SparkListenerApplicationEnd")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === applicationStart) + assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationEnd) } finally { logData.close() } } - /** - * Assert that the line is a correct JSON representation of the event log metadata. - */ - private def assertMetadataValid(line: String, compressionCodec: Option[String] = None): Unit = { - val metadata = JsonProtocol.mapFromJson(parse(line)) - assert(metadata.size === 2 + compressionCodec.size) - assert(metadata.get("Event") === Some(SparkListenerMetadataIdentifier.toString)) - assert(metadata.get(EventLoggingListener.SPARK_VERSION_KEY) === Some(SPARK_VERSION)) - assert(metadata.get(EventLoggingListener.COMPRESSION_CODEC_KEY) === compressionCodec) - } - /** * Test end-to-end event logging functionality in an application. * This runs a simple Spark job and asserts that the expected events are logged when expected. @@ -189,7 +173,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin val eventLogPath = eventLogger.logPath val expectedLogDir = testDir.toURI().toString() assert(eventLogPath === EventLoggingListener.getLogPath( - expectedLogDir, sc.applicationId, compressionCodec)) + expectedLogDir, sc.applicationId, compressionCodec.map(CompressionCodec.getShortName))) // Begin listening for events that trigger asserts val eventExistenceListener = new EventExistenceListener(eventLogger) @@ -203,8 +187,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin eventExistenceListener.assertAllCallbacksInvoked() // Make sure expected events exist in the log file. - val (logData, version) = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), - fileSystem) + val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) val lines = readLines(logData) val eventSet = mutable.Set( SparkListenerApplicationStart, @@ -218,8 +201,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin SparkListenerTaskStart, SparkListenerTaskEnd, SparkListenerApplicationEnd).map(Utils.getFormattedClassName) - // Verify that the first line is valid metadata - assertMetadataValid(lines(0), compressionCodec) lines.foreach { line => eventSet.foreach { event => if (line.contains(event)) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 702c4cb3bdef9..601694f57aad0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -61,7 +61,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { try { val replayer = new ReplayListenerBus() replayer.addListener(eventMonster) - replayer.replay(logData, SPARK_VERSION, logFilePath.toString) + replayer.replay(logData, logFilePath.toString) } finally { logData.close() } @@ -115,12 +115,12 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { assert(!eventLog.isDir) // Replay events - val (logData, version) = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem) + val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem) val eventMonster = new EventMonster(conf) try { val replayer = new ReplayListenerBus() replayer.addListener(eventMonster) - replayer.replay(logData, version, eventLog.getPath().toString) + replayer.replay(logData, eventLog.getPath().toString) } finally { logData.close() } @@ -150,11 +150,4 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { override def start() { } } - - private def getCompressionCodec(codecName: String) = { - val conf = new SparkConf - conf.set("spark.io.compression.codec", codecName) - CompressionCodec.createCodec(conf) - } - } From 7d6aa61c59a784d6f62fee94c9d7f4c6e0f501f9 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sun, 1 Mar 2015 22:51:45 -0800 Subject: [PATCH 6/9] Make codec an extension New log name looks like: app123.lzf --- .../deploy/history/FsHistoryProvider.scala | 2 +- .../scheduler/EventLoggingListener.scala | 19 +++++++------------ .../scheduler/EventLoggingListenerSuite.scala | 13 ++++++++----- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 86ba59f5c48fe..062dbde1780f1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -320,7 +320,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis * log file (along with other metadata files), which is the case for directories generated by * the code in previous releases. * - * @return 2-tuple of (input stream of the events, version of Spark which wrote the log) + * @return input stream that holds one JSON serialized event per line */ private[history] def openLegacyEventLog(dir: Path): InputStream = { val children = fs.listStatus(dir) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 1157560e8aef5..5e7e9a4b77f68 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -234,11 +234,9 @@ private[spark] object EventLoggingListener extends Logging { logBaseDir: String, appId: String, compressionCodecName: Option[String] = None): String = { - val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase - // e.g. app_123, app_123_COMPRESSION_CODEC_lzf - val logName = sanitizedAppId + compressionCodecName - .map { c => s"_${COMPRESSION_CODEC_KEY}_$c" } - .getOrElse("") + val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase + // e.g. app_123, app_123.lzf + val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("") Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName } @@ -256,13 +254,10 @@ private[spark] object EventLoggingListener extends Logging { val in = new BufferedInputStream(fs.open(log)) - // Parse compression codec from the log name - val logName = log.getName - val compressionRegex = s".*_${COMPRESSION_CODEC_KEY}_(.*)".r - val codecName: Option[String] = logName match { - case compressionRegex(_codecName) => Some(_codecName) - case _ => None - } + // Compression codec is encoded as an extension, e.g. app_123.lzf + // Since we sanitize the app ID to not include periods, it is safe to split on it + val logName = log.getName.replaceAll(IN_PROGRESS, "") + val codecName: Option[String] = logName.split("\\.").tail.lastOption val codec = codecName.map { c => codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 595a1c66d7fc9..48f7d500db5c9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -79,7 +79,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin test("Basic event logging with compression") { CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec => - testEventLogging(compressionCodec = Some(codec)) + testEventLogging(compressionCodec = Some(CompressionCodec.getShortName(codec))) } } @@ -89,7 +89,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin test("End-to-end event logging with compression") { CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec => - testApplicationEventLogging(compressionCodec = Some(codec)) + testApplicationEventLogging(compressionCodec = Some(CompressionCodec.getShortName(codec))) } } @@ -108,11 +108,14 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin // without compression assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath("/base-dir", "app1")) // with compression - assert(s"file:/base-dir/app1_COMPRESSION_CODEC_lzf" === + assert(s"file:/base-dir/app1.lzf" === EventLoggingListener.getLogPath("/base-dir", "app1", Some("lzf"))) // illegal characters in app ID - assert(s"file:/base-dir/a-fine-mind_dollar_bills_1" === - EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}1")) + assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" === + EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1")) + // illegal characters in app ID with compression + assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" === + EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1", Some("lz4"))) } /* ----------------- * From 7f537cda6e68f922889f5b95e2869ec770f038a8 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 2 Mar 2015 11:14:46 -0800 Subject: [PATCH 7/9] Address review feedback --- .../deploy/history/FsHistoryProvider.scala | 2 +- .../apache/spark/io/CompressionCodec.scala | 3 +- .../scheduler/EventLoggingListener.scala | 4 +-- .../history/FsHistoryProviderSuite.scala | 29 +++++++++++++------ 4 files changed, 24 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 062dbde1780f1..dec5dc704364e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -320,7 +320,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis * log file (along with other metadata files), which is the case for directories generated by * the code in previous releases. * - * @return input stream that holds one JSON serialized event per line + * @return input stream that holds one JSON record per line */ private[history] def openLegacyEventLog(dir: Path): InputStream = { val children = fs.listStatus(dir) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 187d1cf5ad4a9..0709b6d689e86 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -83,8 +83,7 @@ private[spark] object CompressionCodec { codecName } else { shortCompressionCodecNames - .collect { case (k, v) if v == codecName => k } - .headOption + .collectFirst { case (k, v) if v == codecName => k } .getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 5e7e9a4b77f68..c10566fc305c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -243,7 +243,7 @@ private[spark] object EventLoggingListener extends Logging { /** * Opens an event log file and returns an input stream that contains the event data. * - * @return input stream that holds one JSON serialized event per line + * @return input stream that holds one JSON record per line */ def openEventLog(log: Path, fs: FileSystem): InputStream = { // It's not clear whether FileSystem.open() throws FileNotFoundException or just plain @@ -256,7 +256,7 @@ private[spark] object EventLoggingListener extends Logging { // Compression codec is encoded as an extension, e.g. app_123.lzf // Since we sanitize the app ID to not include periods, it is safe to split on it - val logName = log.getName.replaceAll(IN_PROGRESS, "") + val logName = log.getName.stripSuffix(IN_PROGRESS) val codecName: Option[String] = logName.split("\\.").tail.lastOption val codec = codecName.map { c => codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index a595f50af21f7..eec43812f87ba 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -45,7 +45,10 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers } /** Create a fake log file using the new log format used in Spark 1.3+ */ - private def newLogFile(appId: String, inProgress: Boolean = false): File = { + private def newLogFile( + appId: String, + inProgress: Boolean, + codec: Option[String] = None): File = { val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else "" val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, appId) val logPath = new URI(logUri).getPath + ip @@ -62,6 +65,12 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers SparkListenerApplicationEnd(4L) ) + // Write a new-style application log. + val newAppCompressedComplete = newLogFile("new1compressed", inProgress = false, Some("lzf")) + writeFile(newAppCompressedComplete, true, None, + SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test"), + SparkListenerApplicationEnd(4L)) + // Write an unfinished app, new-style. val newAppIncomplete = newLogFile("new2", inProgress = true) writeFile(newAppIncomplete, true, None, @@ -96,16 +105,18 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers val list = provider.getListing().toSeq list should not be (null) - list.size should be (4) - list.count(e => e.completed) should be (2) + list.size should be (5) + list.count(_.completed) should be (3) - list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 4L, + list(0) should be (ApplicationHistoryInfo(newAppCompressedComplete.getName(), + "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", true)) + list(1) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 4L, newAppComplete.lastModified(), "test", true)) - list(1) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L, + list(2) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L, oldAppComplete.lastModified(), "test", true)) - list(2) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, + list(3) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L, oldAppIncomplete.lastModified(), "test", false)) - list(3) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, + list(4) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L, newAppIncomplete.lastModified(), "test", false)) // Make sure the UI can be rendered. @@ -148,12 +159,12 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers } test("SPARK-3697: ignore directories that cannot be read.") { - val logFile1 = newLogFile("new1") + val logFile1 = newLogFile("new1", inProgress = false) writeFile(logFile1, true, None, SparkListenerApplicationStart("app1-1", None, 1L, "test"), SparkListenerApplicationEnd(2L) ) - val logFile2 = newLogFile("new2") + val logFile2 = newLogFile("new2", inProgress = false) writeFile(logFile2, true, None, SparkListenerApplicationStart("app1-2", None, 1L, "test"), SparkListenerApplicationEnd(2L) From 654883dfbd65de162455601c69108dab4e354f7d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 2 Mar 2015 12:41:00 -0800 Subject: [PATCH 8/9] Add back metadata with Spark version --- .../deploy/history/FsHistoryProvider.scala | 2 +- .../scheduler/EventLoggingListener.scala | 19 +++++++++++++++++-- .../spark/scheduler/SparkListener.scala | 5 +++++ .../spark/scheduler/SparkListenerBus.scala | 1 + .../org/apache/spark/util/JsonProtocol.scala | 14 ++++++++++++++ .../history/FsHistoryProviderSuite.scala | 18 +++++++++++------- .../scheduler/EventLoggingListenerSuite.scala | 17 +++++++++++------ 7 files changed, 60 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index dec5dc704364e..958202fce107a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -320,7 +320,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis * log file (along with other metadata files), which is the case for directories generated by * the code in previous releases. * - * @return input stream that holds one JSON record per line + * @return input stream that holds one JSON record per line. */ private[history] def openLegacyEventLog(dir: Path): InputStream = { val children = fs.listStatus(dir) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index c10566fc305c9..2091a9fe8d0d3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -23,13 +23,14 @@ import java.net.URI import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import com.google.common.base.Charsets import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.hadoop.fs.permission.FsPermission import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ -import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.{Logging, SparkConf, SPARK_VERSION} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} @@ -122,6 +123,8 @@ private[spark] class EventLoggingListener( try { val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) val bstream = new BufferedOutputStream(cstream, outputBufferSize) + + EventLoggingListener.initEventLog(bstream) fileSystem.setPermission(path, LOG_FILE_PERMISSIONS) writer = Some(new PrintWriter(bstream)) logInfo("Logging events to %s".format(logPath)) @@ -215,6 +218,18 @@ private[spark] object EventLoggingListener extends Logging { // A cache for compression codecs to avoid creating the same codec many times private val codecMap = new mutable.HashMap[String, CompressionCodec] + /** + * Write metadata about an event log to the given stream. + * The metadata is encoded in the first line of the event log as JSON. + * + * @param logStream Raw output stream to the event log file. + */ + def initEventLog(logStream: OutputStream): Unit = { + val metadata = SparkListenerLogStart(SPARK_VERSION) + val metadataJson = compact(JsonProtocol.logStartToJson(metadata)) + "\n" + logStream.write(metadataJson.getBytes(Charsets.UTF_8)) + } + /** * Return a file-system-safe path to the log file for the given application. * @@ -243,7 +258,7 @@ private[spark] object EventLoggingListener extends Logging { /** * Opens an event log file and returns an input stream that contains the event data. * - * @return input stream that holds one JSON record per line + * @return input stream that holds one JSON record per line. */ def openEventLog(log: Path, fs: FileSystem): InputStream = { // It's not clear whether FileSystem.open() throws FileNotFoundException or just plain diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index dd28ddb31de1f..52720d48ca67f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -116,6 +116,11 @@ case class SparkListenerApplicationStart(appName: String, appId: Option[String], @DeveloperApi case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent +/** + * An internal class that describes the metadata of an event log. + * This event is not meant to be posted to listeners downstream. + */ +private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent /** * :: DeveloperApi :: diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index fe8a19a2c0cb9..61e69ecc08387 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -58,6 +58,7 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi listener.onExecutorAdded(executorAdded) case executorRemoved: SparkListenerExecutorRemoved => listener.onExecutorRemoved(executorRemoved) + case logStart: SparkListenerLogStart => // ignore event log metadata } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 8e20864db5673..f2561c8510d31 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -89,6 +89,8 @@ private[spark] object JsonProtocol { executorAddedToJson(executorAdded) case executorRemoved: SparkListenerExecutorRemoved => executorRemovedToJson(executorRemoved) + case logStart: SparkListenerLogStart => + logStartToJson(logStart) // These aren't used, but keeps compiler happy case SparkListenerExecutorMetricsUpdate(_, _) => JNothing } @@ -214,6 +216,11 @@ private[spark] object JsonProtocol { ("Removed Reason" -> executorRemoved.reason) } + def logStartToJson(logStart: SparkListenerLogStart): JValue = { + ("Event" -> Utils.getFormattedClassName(logStart)) ~ + ("Spark Version" -> SPARK_VERSION) + } + /** ------------------------------------------------------------------- * * JSON serialization methods for classes SparkListenerEvents depend on | * -------------------------------------------------------------------- */ @@ -447,6 +454,7 @@ private[spark] object JsonProtocol { val applicationEnd = Utils.getFormattedClassName(SparkListenerApplicationEnd) val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded) val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved) + val logStart = Utils.getFormattedClassName(SparkListenerLogStart) (json \ "Event").extract[String] match { case `stageSubmitted` => stageSubmittedFromJson(json) @@ -464,6 +472,7 @@ private[spark] object JsonProtocol { case `applicationEnd` => applicationEndFromJson(json) case `executorAdded` => executorAddedFromJson(json) case `executorRemoved` => executorRemovedFromJson(json) + case `logStart` => logStartFromJson(json) } } @@ -574,6 +583,11 @@ private[spark] object JsonProtocol { SparkListenerExecutorRemoved(time, executorId, reason) } + def logStartFromJson(json: JValue): SparkListenerLogStart = { + val version = (json \ "Spark Version").extract[String] + SparkListenerLogStart(version) + } + /** --------------------------------------------------------------------- * * JSON deserialization methods for classes SparkListenerEvents depend on | * ---------------------------------------------------------------------- */ diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index eec43812f87ba..921471dbe53c1 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.history -import java.io.{File, FileOutputStream, OutputStreamWriter} +import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter} import java.net.URI import scala.io.Source @@ -108,10 +108,10 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers list.size should be (5) list.count(_.completed) should be (3) - list(0) should be (ApplicationHistoryInfo(newAppCompressedComplete.getName(), - "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", true)) - list(1) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 4L, + list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 4L, newAppComplete.lastModified(), "test", true)) + list(1) should be (ApplicationHistoryInfo(newAppCompressedComplete.getName(), + "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", true)) list(2) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L, oldAppComplete.lastModified(), "test", true)) list(3) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, @@ -217,9 +217,13 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec], events: SparkListenerEvent*) = { - val fileStream = new FileOutputStream(file) - val out = codec.map(_.compressedOutputStream(fileStream)).getOrElse(fileStream) - val writer = new OutputStreamWriter(out, "UTF-8") + val fstream = new FileOutputStream(file) + val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream) + val bstream = new BufferedOutputStream(cstream) + if (isNewFormat) { + EventLoggingListener.initEventLog(new FileOutputStream(file)) + } + val writer = new OutputStreamWriter(bstream, "UTF-8") try { events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n")) } finally { diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 48f7d500db5c9..992dde66f982f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ import org.scalatest.{BeforeAndAfter, FunSuite} -import org.apache.spark.{Logging, SparkConf, SparkContext} +import org.apache.spark.{Logging, SparkConf, SparkContext, SPARK_VERSION} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io._ import org.apache.spark.util.{JsonProtocol, Utils} @@ -154,11 +154,14 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) try { val lines = readLines(logData) - assert(lines.size === 2) - assert(lines(0).contains("SparkListenerApplicationStart")) - assert(lines(1).contains("SparkListenerApplicationEnd")) - assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === applicationStart) - assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationEnd) + val logStart = SparkListenerLogStart(SPARK_VERSION) + assert(lines.size === 3) + assert(lines(0).contains("SparkListenerLogStart")) + assert(lines(1).contains("SparkListenerApplicationStart")) + assert(lines(2).contains("SparkListenerApplicationEnd")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) + assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationStart) + assert(JsonProtocol.sparkEventFromJson(parse(lines(2))) === applicationEnd) } finally { logData.close() } @@ -191,6 +194,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin // Make sure expected events exist in the log file. val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) + val logStart = SparkListenerLogStart(SPARK_VERSION) val lines = readLines(logData) val eventSet = mutable.Set( SparkListenerApplicationStart, @@ -215,6 +219,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin } } } + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq) } From 8511141f6b769655acc2b0c1de33a8401c8f7133 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 2 Mar 2015 15:00:28 -0800 Subject: [PATCH 9/9] Fix test --- core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 4 ++-- .../apache/spark/deploy/history/FsHistoryProviderSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index f2561c8510d31..474f79fb756f6 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -584,8 +584,8 @@ private[spark] object JsonProtocol { } def logStartFromJson(json: JValue): SparkListenerLogStart = { - val version = (json \ "Spark Version").extract[String] - SparkListenerLogStart(version) + val sparkVersion = (json \ "Spark Version").extract[String] + SparkListenerLogStart(sparkVersion) } /** --------------------------------------------------------------------- * diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 921471dbe53c1..e908ba604ebed 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -62,7 +62,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers val newAppComplete = newLogFile("new1", inProgress = false) writeFile(newAppComplete, true, None, SparkListenerApplicationStart("new-app-complete", None, 1L, "test"), - SparkListenerApplicationEnd(4L) + SparkListenerApplicationEnd(5L) ) // Write a new-style application log. @@ -108,7 +108,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers list.size should be (5) list.count(_.completed) should be (3) - list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 4L, + list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L, newAppComplete.lastModified(), "test", true)) list(1) should be (ApplicationHistoryInfo(newAppCompressedComplete.getName(), "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", true))