diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 8ce4b91cae8ae..6fafe916a5404 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -17,8 +17,10 @@ package org.apache.spark +import java.io.File import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap +import org.apache.spark.util.Utils /** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. @@ -46,6 +48,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { private val settings = new HashMap[String, String]() if (loadDefaults) { + loadDefaultProperties() // Load any spark.* system properties for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) { settings(k) = v @@ -291,4 +294,25 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { def toDebugString: String = { settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n") } + + def loadDefaultProperties() { + var seq = Seq[(String, String)]() + + val defaultConfigFile = "spark-defaults.conf" + Option(Utils.getSparkClassLoader.getResourceAsStream("/spark-defaults.conf")).foreach { in => + seq = seq ++ Utils.getPropertiesFromInputStream(in) + } + + sys.env.get("SPARK_HOME").foreach { sparkHome => + val file = new File(Seq(sparkHome, "conf", defaultConfigFile).mkString(File.separator)) + if (file.exists() && file.isFile()) { + seq = seq ++ Utils.getPropertiesFromFile(file) + } + } + + for ((k, v) <- seq if k.startsWith("spark.")) { + settings(k) = v + } + } + } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index f1032ea8dbada..d8ba028d3a014 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -17,8 +17,7 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io.File import java.util.jar.JarFile import scala.collection.JavaConversions._ @@ -67,7 +66,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { Option(propertiesFile).foreach { filename => val file = new File(filename) SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) => - if (k.startsWith("spark")) { + if (k.startsWith("spark.")) { defaultProperties(k) = v if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v") } else { @@ -383,20 +382,5 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { object SparkSubmitArguments { /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { - require(file.exists(), s"Properties file $file does not exist") - require(file.isFile(), s"Properties file $file is not a normal file") - val inputStream = new FileInputStream(file) - try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim)) - } catch { - case e: IOException => - val message = s"Failed when loading Spark properties file $file" - throw new SparkException(message, e) - } finally { - inputStream.close() - } - } + def getPropertiesFromFile(file: File) = Utils.getPropertiesFromFile(file) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a2454e120a8ab..ec229e2b0d3f4 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -20,7 +20,7 @@ package org.apache.spark.util import java.io._ import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL, URLConnection} import java.nio.ByteBuffer -import java.util.{Locale, Random, UUID} +import java.util.{Properties, Locale, Random, UUID} import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} import scala.collection.JavaConversions._ @@ -1286,4 +1286,27 @@ private[spark] object Utils extends Logging { } } + /** Load properties present in the given file. */ + def getPropertiesFromFile(file: File): Seq[(String, String)] = { + require(file.exists(), s"Properties file $file does not exist") + require(file.isFile(), s"Properties file $file is not a normal file") + val inputStream = new FileInputStream(file) + getPropertiesFromInputStream(inputStream) + } + + /** Load properties present in the given inputStream. */ + def getPropertiesFromInputStream(inputStream: InputStream): Seq[(String, String)] = { + try { + val properties = new Properties() + properties.load(inputStream) + properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim)) + } catch { + case e: IOException => + val message = s"Failed when loading Spark properties" + throw new SparkException(message, e) + } finally { + inputStream.close() + } + } + }