Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark

import java.io.File
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import org.apache.spark.util.Utils

/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
Expand Down Expand Up @@ -46,6 +48,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
private val settings = new HashMap[String, String]()

if (loadDefaults) {
loadDefaultProperties()
// Load any spark.* system properties
for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) {
settings(k) = v
Expand Down Expand Up @@ -291,4 +294,25 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def toDebugString: String = {
settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
}

def loadDefaultProperties() {
var seq = Seq[(String, String)]()

val defaultConfigFile = "spark-defaults.conf"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will end up causing disagreement with spark-submit options.

spark-submit will only load spark-defaults.conf if --properties-file is not defined in the command line. It seems like this code will always load spark-defaults.conf if one exists.

Option(Utils.getSparkClassLoader.getResourceAsStream("/spark-defaults.conf")).foreach { in =>
seq = seq ++ Utils.getPropertiesFromInputStream(in)
}

sys.env.get("SPARK_HOME").foreach { sparkHome =>
val file = new File(Seq(sparkHome, "conf", defaultConfigFile).mkString(File.separator))
if (file.exists() && file.isFile()) {
seq = seq ++ Utils.getPropertiesFromFile(file)
}
}

for ((k, v) <- seq if k.startsWith("spark.")) {
settings(k) = v
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.deploy

import java.io.{File, FileInputStream, IOException}
import java.util.Properties
import java.io.File
import java.util.jar.JarFile

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -67,7 +66,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
Option(propertiesFile).foreach { filename =>
val file = new File(filename)
SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) =>
if (k.startsWith("spark")) {
if (k.startsWith("spark.")) {
defaultProperties(k) = v
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
} else {
Expand Down Expand Up @@ -383,20 +382,5 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {

object SparkSubmitArguments {
/** Load properties present in the given file. */
def getPropertiesFromFile(file: File): Seq[(String, String)] = {
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inputStream = new FileInputStream(file)
try {
val properties = new Properties()
properties.load(inputStream)
properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
} catch {
case e: IOException =>
val message = s"Failed when loading Spark properties file $file"
throw new SparkException(message, e)
} finally {
inputStream.close()
}
}
def getPropertiesFromFile(file: File) = Utils.getPropertiesFromFile(file)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, might as well remove the object.

}
25 changes: 24 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.util
import java.io._
import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL, URLConnection}
import java.nio.ByteBuffer
import java.util.{Locale, Random, UUID}
import java.util.{Properties, Locale, Random, UUID}
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -1286,4 +1286,27 @@ private[spark] object Utils extends Logging {
}
}

/** Load properties present in the given file. */
def getPropertiesFromFile(file: File): Seq[(String, String)] = {
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inputStream = new FileInputStream(file)
getPropertiesFromInputStream(inputStream)
}

/** Load properties present in the given inputStream. */
def getPropertiesFromInputStream(inputStream: InputStream): Seq[(String, String)] = {
try {
val properties = new Properties()
properties.load(inputStream)
properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
} catch {
case e: IOException =>
val message = s"Failed when loading Spark properties"
throw new SparkException(message, e)
} finally {
inputStream.close()
}
}

}