From 135343cf45890c9868bf428a13399a8cad0dd604 Mon Sep 17 00:00:00 2001 From: witgo Date: Thu, 26 Jun 2014 23:50:47 +0800 Subject: [PATCH 1/2] Loading spark-defaults.conf when creating SparkConf instances --- .../scala/org/apache/spark/SparkConf.scala | 25 +++++++++++++++++++ .../spark/deploy/SparkSubmitArguments.scala | 20 ++------------- .../scala/org/apache/spark/util/Utils.scala | 25 ++++++++++++++++++- 3 files changed, 51 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 8ce4b91cae8ae..1b676c2a07f57 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -17,8 +17,10 @@ package org.apache.spark +import java.io.File import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap +import org.apache.spark.util.Utils /** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. @@ -46,6 +48,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { private val settings = new HashMap[String, String]() if (loadDefaults) { + loadDefaultProperties() // Load any spark.* system properties for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) { settings(k) = v @@ -291,4 +294,26 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { def toDebugString: String = { settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n") } + + def loadDefaultProperties() { + var seq = Seq[(String, String)]() + + val defaultConfigFile = "spark-defaults.conf" + Option(Utils.getSparkClassLoader.getResourceAsStream("/spark-defaults.conf")).foreach { in => + seq = seq ++ Utils.getPropertiesFromInputStream(in) + } + + sys.env.get("SPARK_HOME").foreach { sparkHome => + val file = new File(Seq(sparkHome, "conf", defaultConfigFile).mkString(File.separator)) + if (file.exists() && file.isFile()) { + seq = seq ++ Utils.getPropertiesFromFile(file) + + } + } + + for ((k, v) <- seq if k.startsWith("spark.")) { + settings(k) = v + } + } + } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index f1032ea8dbada..66914d43d5325 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -17,8 +17,7 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io.File import java.util.jar.JarFile import scala.collection.JavaConversions._ @@ -383,20 +382,5 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { object SparkSubmitArguments { /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { - require(file.exists(), s"Properties file $file does not exist") - require(file.isFile(), s"Properties file $file is not a normal file") - val inputStream = new FileInputStream(file) - try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim)) - } catch { - case e: IOException => - val message = s"Failed when loading Spark properties file $file" - throw new SparkException(message, e) - } finally { - inputStream.close() - } - } + def getPropertiesFromFile(file: File) = Utils.getPropertiesFromFile(file) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a2454e120a8ab..f5e773405fdde 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -20,7 +20,7 @@ package org.apache.spark.util import java.io._ import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL, URLConnection} import java.nio.ByteBuffer -import java.util.{Locale, Random, UUID} +import java.util.{Properties, Locale, Random, UUID} import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} import scala.collection.JavaConversions._ @@ -1286,4 +1286,27 @@ private[spark] object Utils extends Logging { } } + /** Load properties present in the given file. */ + def getPropertiesFromFile(file: File): Seq[(String, String)] = { + require(file.exists(), s"Properties file $file does not exist") + require(file.isFile(), s"Properties file $file is not a normal file") + val inputStream = new FileInputStream(file) + getPropertiesFromInputStream(inputStream) + } + + /** Load properties present in the given inputStream. */ + def getPropertiesFromInputStream(inputStream: InputStream): Seq[(String, String)] = { + try { + val properties = new Properties() + properties.load(inputStream) + properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim)) + } catch { + case e: IOException => + val message = s"Failed when loading Spark properties file $file" + throw new SparkException(message, e) + } finally { + inputStream.close() + } + } + } From 3ec5479f18b31e44ddbafb15991b1e630b4df21a Mon Sep 17 00:00:00 2001 From: witgo Date: Fri, 27 Jun 2014 00:00:24 +0800 Subject: [PATCH 2/2] review commit --- core/src/main/scala/org/apache/spark/SparkConf.scala | 1 - .../scala/org/apache/spark/deploy/SparkSubmitArguments.scala | 2 +- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 1b676c2a07f57..6fafe916a5404 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -307,7 +307,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { val file = new File(Seq(sparkHome, "conf", defaultConfigFile).mkString(File.separator)) if (file.exists() && file.isFile()) { seq = seq ++ Utils.getPropertiesFromFile(file) - } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 66914d43d5325..d8ba028d3a014 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -66,7 +66,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { Option(propertiesFile).foreach { filename => val file = new File(filename) SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) => - if (k.startsWith("spark")) { + if (k.startsWith("spark.")) { defaultProperties(k) = v if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v") } else { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f5e773405fdde..ec229e2b0d3f4 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1302,7 +1302,7 @@ private[spark] object Utils extends Logging { properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim)) } catch { case e: IOException => - val message = s"Failed when loading Spark properties file $file" + val message = s"Failed when loading Spark properties" throw new SparkException(message, e) } finally { inputStream.close()