From 3150ed61a40d0c5fdd1f324ee7e557e47d3b7fd0 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Thu, 24 Apr 2014 13:55:31 -0500 Subject: [PATCH 1/5] SPARK-1557 Set permissions on event log files/directories --- .../scheduler/EventLoggingListener.scala | 12 ++++++---- .../org/apache/spark/util/FileLogger.scala | 24 +++++++++++++------ docs/security.md | 2 ++ 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 2fe65cd944b67..e822ff6bbed85 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -51,10 +51,11 @@ private[spark] class EventLoggingListener( private val logBaseDir = conf.get("spark.eventLog.dir", "/tmp/spark-events").stripSuffix("/") private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis val logDir = logBaseDir + "/" + name + val LOG_FILE_PERMISSIONS: FsPermission = FsPermission.createImmutable(0770: Short) private val logger = new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress, - shouldOverwrite) + shouldOverwrite, Some(LOG_FILE_PERMISSIONS)) /** * Begin logging events. @@ -64,10 +65,11 @@ private[spark] class EventLoggingListener( logInfo("Logging events to %s".format(logDir)) if (shouldCompress) { val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC) - logger.newFile(COMPRESSION_CODEC_PREFIX + codec) + logger.newFile(COMPRESSION_CODEC_PREFIX + codec, Some(LOG_FILE_PERMISSIONS)) } - logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION) - logger.newFile(LOG_PREFIX + logger.fileIndex) + logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION, + Some(LOG_FILE_PERMISSIONS)) + logger.newFile(LOG_PREFIX + logger.fileIndex, Some(LOG_FILE_PERMISSIONS)) } /** Log the event as JSON. */ @@ -114,7 +116,7 @@ private[spark] class EventLoggingListener( * In addition, create an empty special file to indicate application completion. */ def stop() = { - logger.newFile(APPLICATION_COMPLETE) + logger.newFile(APPLICATION_COMPLETE, Some(LOG_FILE_PERMISSIONS)) logger.stop() } } diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 7d47b2a72aff7..1b2265a65e6e0 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.spark.{Logging, SparkConf} import org.apache.spark.io.CompressionCodec +import org.apache.hadoop.fs.permission.FsPermission /** * A generic class for logging information to file. @@ -42,7 +43,8 @@ private[spark] class FileLogger( hadoopConfiguration: Configuration, outputBufferSize: Int = 8 * 1024, // 8 KB compress: Boolean = false, - overwrite: Boolean = true) + overwrite: Boolean = true, + dirPermissions: Option[FsPermission] = None) extends Logging { private val dateFormat = new ThreadLocal[SimpleDateFormat]() { @@ -60,12 +62,12 @@ private[spark] class FileLogger( private var writer: Option[PrintWriter] = None - createLogDir() + createLogDir(dirPermissions) /** * Create a logging directory with the given path. */ - private def createLogDir() { + private def createLogDir(dirPerms: Option[FsPermission]) { val path = new Path(logDir) if (fileSystem.exists(path)) { if (overwrite) { @@ -79,16 +81,23 @@ private[spark] class FileLogger( if (!fileSystem.mkdirs(path)) { throw new IOException("Error in creating log directory: %s".format(logDir)) } + if (dirPerms.isDefined) { + val fsStatus = fileSystem.getFileStatus(path) + if (fsStatus.getPermission().toShort() != dirPerms.get.toShort()) { + fileSystem.setPermission(path, dirPerms.get); + } + } } /** * Create a new writer for the file identified by the given path. */ - private def createWriter(fileName: String): PrintWriter = { + private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = { val logPath = logDir + "/" + fileName val uri = new URI(logPath) val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme val isDefaultLocal = (defaultFs == null || defaultFs == "file") + val path = new Path(logPath) /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). * Therefore, for local files, use FileOutputStream instead. */ @@ -97,11 +106,11 @@ private[spark] class FileLogger( // Second parameter is whether to append new FileOutputStream(uri.getPath, !overwrite) } else { - val path = new Path(logPath) hadoopDataStream = Some(fileSystem.create(path, overwrite)) hadoopDataStream.get } + if (perms.isDefined) fileSystem.setPermission(path, perms.get) val bstream = new BufferedOutputStream(dstream, outputBufferSize) val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream new PrintWriter(cstream) @@ -150,15 +159,16 @@ private[spark] class FileLogger( /** * Start a writer for a new file, closing the existing one if it exists. * @param fileName Name of the new file, defaulting to the file index if not provided. + * @param perms Permissions to put on the new file. */ - def newFile(fileName: String = "") { + def newFile(fileName: String = "", perms: Option[FsPermission] = None) { fileIndex += 1 writer.foreach(_.close()) val name = fileName match { case "" => fileIndex.toString case _ => fileName } - writer = Some(createWriter(name)) + writer = Some(createWriter(name, perms)) } /** diff --git a/docs/security.md b/docs/security.md index 9e4218fbcfe7d..90c69915f517f 100644 --- a/docs/security.md +++ b/docs/security.md @@ -7,6 +7,8 @@ Spark currently supports authentication via a shared secret. Authentication can The Spark UI can also be secured by using javax servlet filters. A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view acls to make sure they are authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls' control the behavior of the acls. Note that the person who started the application always has view access to the UI. +If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) should be manually created and have the proper permissions set on it. If you want those log files secure, the permissions should be set to drwxrwxrwxt for that directory. The owner of the directory should be the super user who is running the history server and the group permissions should be restricted to super user group. This will allow all users to write to the directory but will prevent unprivileged users from removing or renaming a file unless they own the file or directory. The event log files will be created by Spark with permissions such that only the user and group have read and write access. + For Spark on Yarn deployments, configuring `spark.authenticate` to true will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret. The Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI. For other types of Spark deployments, the spark config `spark.authenticate.secret` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications. The UI can be secured using a javax servlet filter installed via `spark.ui.filters`. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI. From 5a09709fa650f07b8d25ca958faaf2d85405c9eb Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Thu, 24 Apr 2014 16:17:15 -0500 Subject: [PATCH 2/5] add in missing import --- .../scala/org/apache/spark/scheduler/EventLoggingListener.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index e822ff6bbed85..12db744ec8027 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -21,6 +21,7 @@ import scala.collection.mutable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.permission.FsPermission import org.json4s.jackson.JsonMethods._ import org.apache.spark.{Logging, SparkConf, SparkContext} From 3ca9b797f7c69599fe2642375665c29f7326adf1 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 25 Apr 2014 08:21:30 -0500 Subject: [PATCH 3/5] Updated based on comments --- .../scala/org/apache/spark/util/FileLogger.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 1b2265a65e6e0..f33017e298d87 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -24,10 +24,10 @@ import java.util.Date import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission import org.apache.spark.{Logging, SparkConf} import org.apache.spark.io.CompressionCodec -import org.apache.hadoop.fs.permission.FsPermission /** * A generic class for logging information to file. @@ -62,12 +62,12 @@ private[spark] class FileLogger( private var writer: Option[PrintWriter] = None - createLogDir(dirPermissions) + createLogDir() /** * Create a logging directory with the given path. */ - private def createLogDir(dirPerms: Option[FsPermission]) { + private def createLogDir() { val path = new Path(logDir) if (fileSystem.exists(path)) { if (overwrite) { @@ -81,10 +81,10 @@ private[spark] class FileLogger( if (!fileSystem.mkdirs(path)) { throw new IOException("Error in creating log directory: %s".format(logDir)) } - if (dirPerms.isDefined) { + if (dirPermissions.isDefined) { val fsStatus = fileSystem.getFileStatus(path) - if (fsStatus.getPermission().toShort() != dirPerms.get.toShort()) { - fileSystem.setPermission(path, dirPerms.get); + if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort()) { + fileSystem.setPermission(path, dirPermissions.get); } } } @@ -110,7 +110,7 @@ private[spark] class FileLogger( hadoopDataStream.get } - if (perms.isDefined) fileSystem.setPermission(path, perms.get) + perms.foreach {p => fileSystem.setPermission(path, p)} val bstream = new BufferedOutputStream(dstream, outputBufferSize) val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream new PrintWriter(cstream) From d8b6620ee48959cb548f9179d6c442d23f5cfa56 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 25 Apr 2014 14:33:44 -0500 Subject: [PATCH 4/5] update use of octal --- .../org/apache/spark/scheduler/EventLoggingListener.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 12db744ec8027..042b2a563aac4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -52,7 +52,8 @@ private[spark] class EventLoggingListener( private val logBaseDir = conf.get("spark.eventLog.dir", "/tmp/spark-events").stripSuffix("/") private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis val logDir = logBaseDir + "/" + name - val LOG_FILE_PERMISSIONS: FsPermission = FsPermission.createImmutable(0770: Short) + val LOG_FILE_PERMISSIONS: FsPermission = + FsPermission.createImmutable(Integer.parseInt("770", 8).toShort: Short) private val logger = new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress, From e471d8e7a92f008e628e5fa39c13f96cee5b0e2b Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 28 Apr 2014 09:29:37 -0500 Subject: [PATCH 5/5] rework --- .../spark/scheduler/EventLoggingListener.scala | 14 +++++++------- .../scala/org/apache/spark/util/FileLogger.scala | 8 +++++--- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 042b2a563aac4..d822a8e55111a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -52,8 +52,6 @@ private[spark] class EventLoggingListener( private val logBaseDir = conf.get("spark.eventLog.dir", "/tmp/spark-events").stripSuffix("/") private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis val logDir = logBaseDir + "/" + name - val LOG_FILE_PERMISSIONS: FsPermission = - FsPermission.createImmutable(Integer.parseInt("770", 8).toShort: Short) private val logger = new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress, @@ -67,11 +65,10 @@ private[spark] class EventLoggingListener( logInfo("Logging events to %s".format(logDir)) if (shouldCompress) { val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC) - logger.newFile(COMPRESSION_CODEC_PREFIX + codec, Some(LOG_FILE_PERMISSIONS)) + logger.newFile(COMPRESSION_CODEC_PREFIX + codec) } - logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION, - Some(LOG_FILE_PERMISSIONS)) - logger.newFile(LOG_PREFIX + logger.fileIndex, Some(LOG_FILE_PERMISSIONS)) + logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION) + logger.newFile(LOG_PREFIX + logger.fileIndex) } /** Log the event as JSON. */ @@ -118,7 +115,7 @@ private[spark] class EventLoggingListener( * In addition, create an empty special file to indicate application completion. */ def stop() = { - logger.newFile(APPLICATION_COMPLETE, Some(LOG_FILE_PERMISSIONS)) + logger.newFile(APPLICATION_COMPLETE) logger.stop() } } @@ -128,6 +125,9 @@ private[spark] object EventLoggingListener extends Logging { val SPARK_VERSION_PREFIX = "SPARK_VERSION_" val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_" val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" + val LOG_FILE_PERMISSIONS: FsPermission = + FsPermission.createImmutable(Integer.parseInt("770", 8).toShort) + // A cache for compression codecs to avoid creating the same codec many times private val codecMap = new mutable.HashMap[String, CompressionCodec] diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index f33017e298d87..581c03f1332d3 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -83,14 +83,16 @@ private[spark] class FileLogger( } if (dirPermissions.isDefined) { val fsStatus = fileSystem.getFileStatus(path) - if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort()) { - fileSystem.setPermission(path, dirPermissions.get); + if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort) { + fileSystem.setPermission(path, dirPermissions.get) } } } /** * Create a new writer for the file identified by the given path. + * If the permissions are not passed in, it will default to use the permissions + * (dirpermissions) used when class was instantiated. */ private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = { val logPath = logDir + "/" + fileName @@ -110,7 +112,7 @@ private[spark] class FileLogger( hadoopDataStream.get } - perms.foreach {p => fileSystem.setPermission(path, p)} + perms.orElse(dirPermissions).foreach {p => fileSystem.setPermission(path, p)} val bstream = new BufferedOutputStream(dstream, outputBufferSize) val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream new PrintWriter(cstream)