From 89a7f8b910565dca34d0099cbf9f5f90d66afa91 Mon Sep 17 00:00:00 2001 From: Pat McDonough Date: Wed, 9 Apr 2014 22:14:25 -0700 Subject: [PATCH 1/3] Addeded a new parameter, spark.system.reservedMemorySize to reserve a portion of the heap for system objects, especilaly in the case of smaller heaps (like the out of the box conifuration for the spark shell). --- .../apache/spark/storage/BlockManager.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 9 ++++++++ .../collection/ExternalAppendOnlyMap.scala | 3 ++- docs/configuration.md | 23 ++++++++++++------- docs/tuning.md | 13 ++++++----- 5 files changed, 34 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a2a729130091f..e4432b6fe093a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1045,7 +1045,7 @@ private[spark] object BlockManager extends Logging { def getMaxMemory(conf: SparkConf): Long = { val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) - (Runtime.getRuntime.maxMemory * memoryFraction).toLong + (Utils.effectiveMaxMemory(conf) * memoryFraction).toLong } def getHeartBeatFrequency(conf: SparkConf): Long = diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 59da51f3e0297..aa81ac147ce88 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1022,4 +1022,13 @@ private[spark] object Utils extends Logging { def getHadoopFileSystem(path: URI): FileSystem = { FileSystem.get(path, SparkHadoopUtil.get.newConfiguration()) } + + /** + * Determine the system's effective maximum memory after taking into account + * `spark.system.reservedMemorySize` + */ + def effectiveMaxMemory(conf: SparkConf) = { + Runtime.getRuntime.maxMemory - (1024 * 1024 * + memoryStringToMb(conf.get("spark.system.reservedMemorySize", "300m"))) + } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index dd01ae821f705..39e15c506c89f 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -77,7 +77,8 @@ class ExternalAppendOnlyMap[K, V, C]( private val maxMemoryThreshold = { val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3) val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8) - (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + (org.apache.spark.util.Utils.effectiveMaxMemory(sparkConf) + * memoryFraction * safetyFraction).toLong } // Number of pairs in the in-memory map diff --git a/docs/configuration.md b/docs/configuration.md index 9c602402f0635..2274e98edb26a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -102,24 +102,31 @@ Apart from these, the following properties are also available, and may be useful reduceByKey, etc) when not set by user. + + spark.system.reservedMemorySize + 300m + + Amount of Heap to reserve for Spark's internal components, before calculating memory available for storage + and shuffle as configured in spark.storage.memoryFraction and spark.shuffle.memoryFraction + + spark.storage.memoryFraction 0.6 - Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" - generation of objects in the JVM, which by default is given 0.6 of the heap, but you can increase - it if you configure your own old generation size. + Fraction of Java heap to use for Spark's memory cache, after accounting for spark.system.reservedMemorySize. + The effective size should not be larger than the "old"generation of objects in the JVM. spark.shuffle.memoryFraction 0.3 - Fraction of Java heap to use for aggregation and cogroups during shuffles, if - spark.shuffle.spill is true. At any given time, the collective size of - all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will - begin to spill to disk. If spills are often, consider increasing this value at the expense of - spark.storage.memoryFraction. + Fraction of Java heap to use for aggregation and cogroups during shuffles, after accounting for + spark.system.reservedMemorySize, and if spark.shuffle.spill is true. + At any given time, the collective size of all in-memory maps used for shuffles is bounded by this + limit, beyond which the contents will begin to spill to disk. If spills are often, consider increasing + this value at the expense of spark.storage.memoryFraction. diff --git a/docs/tuning.md b/docs/tuning.md index 093df3187a789..0873e2f6cae66 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -163,14 +163,15 @@ their work directories), *not* on your driver program. **Cache Size Tuning** One important configuration parameter for GC is the amount of memory that should be used for caching RDDs. -By default, Spark uses 60% of the configured executor memory (`spark.executor.memory`) to -cache RDDs. This means that 40% of memory is available for any objects created during task execution. +By default, Spark caches RDDs using 60% of the configured executor memory (`spark.executor.memory`), after accounting for +the reserved memory defined by `spark.system.reservedMemorySize`. The remaining memory is used for +other objects, for example to perform shuffles in-memory. You can change the amount of heap used for caching +by setting `spark.storage.memoryFraction`. For example, to change this to 50%, you can call +`conf.set("spark.storage.memoryFraction", "0.5")` on your SparkConf. In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of -memory, lowering this value will help reduce the memory consumption. To change this to say 50%, you can call -`conf.set("spark.storage.memoryFraction", "0.5")` on your SparkConf. Combined with the use of serialized caching, -using a smaller cache should be sufficient to mitigate most of the garbage collection problems. -In case you are interested in further tuning the Java GC, continue reading below. +memory, lowering `spark.storage.memoryFraction` or raising `spark.system.reservedMemorySize` +can provide enough headroom for GC. **Advanced GC Tuning** From e8e0dd7c9e9563a6edd4a9df6e709aad05921795 Mon Sep 17 00:00:00 2001 From: Pat McDonough Date: Wed, 9 Apr 2014 23:08:33 -0700 Subject: [PATCH 2/3] Increase the default for spark.executor.memory to 1g --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- docs/configuration.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f7750514ae13d..815912a6283ac 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -248,7 +248,7 @@ class SparkContext(config: SparkConf) extends Logging { .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) .orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem)) .map(Utils.memoryStringToMb) - .getOrElse(512) + .getOrElse(1024) // Environment variables to pass to our executors private[spark] val executorEnvs = HashMap[String, String]() diff --git a/docs/configuration.md b/docs/configuration.md index 2274e98edb26a..174827a86be94 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -40,7 +40,7 @@ there are at least five properties that you will commonly want to control: Property NameDefaultMeaning spark.executor.memory - 512m + 1g Amount of memory to use per executor process, in the same format as JVM memory strings (e.g. 512m, 2g). From d5c33650a5b94c4983cd9dedc15a886ebc846d7d Mon Sep 17 00:00:00 2001 From: Pat McDonough Date: Thu, 10 Apr 2014 17:16:50 -0700 Subject: [PATCH 3/3] Updating styles and docs per suggestions in the PR --- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++-- .../spark/util/collection/ExternalAppendOnlyMap.scala | 4 ++-- docs/configuration.md | 7 ++++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index aa81ac147ce88..baeab59efb143 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1028,7 +1028,7 @@ private[spark] object Utils extends Logging { * `spark.system.reservedMemorySize` */ def effectiveMaxMemory(conf: SparkConf) = { - Runtime.getRuntime.maxMemory - (1024 * 1024 * - memoryStringToMb(conf.get("spark.system.reservedMemorySize", "300m"))) + val reservedMemorySize = memoryStringToMb(conf.get("spark.system.reservedMemorySize", "300m")) + Runtime.getRuntime.maxMemory - (1024 * 1024 * reservedMemorySize) } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 39e15c506c89f..a6fcb9a3d5bbf 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -77,8 +77,8 @@ class ExternalAppendOnlyMap[K, V, C]( private val maxMemoryThreshold = { val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3) val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8) - (org.apache.spark.util.Utils.effectiveMaxMemory(sparkConf) - * memoryFraction * safetyFraction).toLong + val effectiveMaxMemory = org.apache.spark.util.Utils.effectiveMaxMemory(sparkConf) + (effectiveMaxMemory * memoryFraction * safetyFraction).toLong } // Number of pairs in the in-memory map diff --git a/docs/configuration.md b/docs/configuration.md index 174827a86be94..a7ffb9244b4c8 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -106,8 +106,9 @@ Apart from these, the following properties are also available, and may be useful spark.system.reservedMemorySize 300m - Amount of Heap to reserve for Spark's internal components, before calculating memory available for storage - and shuffle as configured in spark.storage.memoryFraction and spark.shuffle.memoryFraction + Constant amount of heap to reserve on executors for Spark's own code and user code. Taken into account before calculating + memory available for and shuffle as configured in spark.storage.memoryFraction and + spark.shuffle.memoryFraction. @@ -115,7 +116,7 @@ Apart from these, the following properties are also available, and may be useful 0.6 Fraction of Java heap to use for Spark's memory cache, after accounting for spark.system.reservedMemorySize. - The effective size should not be larger than the "old"generation of objects in the JVM. + The effective size should not be larger than the "old" generation of objects in the JVM.