From 6ae1ee82f49e10166c29c538f452503236d06531 Mon Sep 17 00:00:00 2001 From: qqsun8819 Date: Sun, 9 Mar 2014 14:19:10 +0800 Subject: [PATCH 1/3] Add a static function in LocalBackEnd to let it use spark.cores.max specified cores when no cores are passed to it --- .../spark/scheduler/local/LocalBackend.scala | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 16e2f5cf3076d..f683d81f9972f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -79,9 +79,12 @@ private[spark] class LocalActor( * master all run in the same JVM. It sits behind a TaskSchedulerImpl and handles launching tasks * on a single Executor (created by the LocalBackend) running locally. */ -private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int) +private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, toUseCores: Int = Int.MaxValue) extends SchedulerBackend with ExecutorBackend { + val totalCores = LocalBackend.getMaxCores( + scheduler.sc.conf.getOption("spark.cores.max").map(_.toInt), toUseCores) + var localActor: ActorRef = null override def start() { @@ -107,3 +110,25 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: localActor ! StatusUpdate(taskId, state, serializedData) } } + +private[spark] object LocalBackend { + private def getMaxCores(confMaxCores: Option[Int], toUseCores: Int) : Int = { + if (toUseCores != Int.MaxValue) { + // This indicates that if user explicitly pass in the desired cores, + // we should use it instead of default cores from spark.cores.max + toUseCores + } else { + val retCores = confMaxCores.getOrElse(toUseCores) + val realCores = Runtime.getRuntime.availableProcessors() + if (retCores > realCores) { + // If spark.cores.max specified cores is larger than logical cores, use logical cores + realCores + } else { + retCores + } + } + + + + } +} From 78b9c60ce8279189e486479fbb211410c1a1b73c Mon Sep 17 00:00:00 2001 From: qqsun8819 Date: Sun, 9 Mar 2014 15:28:23 +0800 Subject: [PATCH 2/3] 1 SparkContext MASTER=local pattern use default cores instead of 1 to construct LocalBackEnd , for use of spark-shell and cores specified in cmd line 2 some test case change from local to local[1]. 3 SparkContextSchedulerCreationSuite test spark.cores.max config in local pattern --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../spark/scheduler/local/LocalBackend.scala | 3 --- .../scala/org/apache/spark/FileSuite.scala | 4 ++-- .../SparkContextSchedulerCreationSuite.scala | 19 ++++++++++++++++--- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ce25573834829..32487d1c22b6b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1204,7 +1204,7 @@ object SparkContext extends Logging { master match { case "local" => val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) - val backend = new LocalBackend(scheduler, 1) + val backend = new LocalBackend(scheduler) scheduler.initialize(backend) scheduler diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index f683d81f9972f..5d3de0c0dd407 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -127,8 +127,5 @@ private[spark] object LocalBackend { retCores } } - - - } } diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 76173608e9f70..3390fee43782f 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat class FileSuite extends FunSuite with LocalSparkContext { test("text files") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local[1]", "test") val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "output").getAbsolutePath val nums = sc.makeRDD(1 to 4) @@ -175,7 +175,7 @@ class FileSuite extends FunSuite with LocalSparkContext { test("write SequenceFile using new Hadoop API") { import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat - sc = new SparkContext("local", "test") + sc = new SparkContext("local[1]", "test") val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "output").getAbsolutePath val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x))) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index 3bb936790d506..0b90d54a0ba1b 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -27,10 +27,10 @@ import org.apache.spark.scheduler.local.LocalBackend class SparkContextSchedulerCreationSuite extends FunSuite with PrivateMethodTester with LocalSparkContext with Logging { - def createTaskScheduler(master: String): TaskSchedulerImpl = { + def createTaskScheduler(master: String, conf: SparkConf = new SparkConf()): TaskSchedulerImpl = { // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the // real schedulers, so we don't want to create a full SparkContext with the desired scheduler. - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler) val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master, "test") sched.asInstanceOf[TaskSchedulerImpl] @@ -44,13 +44,26 @@ class SparkContextSchedulerCreationSuite } test("local") { - val sched = createTaskScheduler("local") + var conf = new SparkConf() + conf.set("spark.cores.max", "1") + val sched = createTaskScheduler("local", conf) sched.backend match { case s: LocalBackend => assert(s.totalCores === 1) case _ => fail() } } + test("local-cores-exceed") { + val cores = Runtime.getRuntime.availableProcessors() + 1 + var conf = new SparkConf() + conf.set("spark.cores.max", cores.toString) + val sched = createTaskScheduler("local", conf) + sched.backend match { + case s: LocalBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors()) + case _ => fail() + } + } + test("local-n") { val sched = createTaskScheduler("local[5]") assert(sched.maxTaskFailures === 1) From 731aefa014daec8d72db245a87d3694667df0e54 Mon Sep 17 00:00:00 2001 From: qqsun8819 Date: Wed, 12 Mar 2014 21:31:32 +0800 Subject: [PATCH 3/3] 1 LocalBackend not change 2 In SparkContext do some process to the cores and pass it to original LocalBackend constructor --- .../scala/org/apache/spark/SparkContext.scala | 5 +++- .../spark/scheduler/local/LocalBackend.scala | 24 +------------------ 2 files changed, 5 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 32487d1c22b6b..03cd7c34a22d5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1204,7 +1204,10 @@ object SparkContext extends Logging { master match { case "local" => val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) - val backend = new LocalBackend(scheduler) + // Use user specified in config, up to all available cores + val realCores = Runtime.getRuntime.availableProcessors() + val toUseCores = math.min(sc.conf.getInt("spark.cores.max", realCores), realCores) + val backend = new LocalBackend(scheduler, toUseCores) scheduler.initialize(backend) scheduler diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 5d3de0c0dd407..16e2f5cf3076d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -79,12 +79,9 @@ private[spark] class LocalActor( * master all run in the same JVM. It sits behind a TaskSchedulerImpl and handles launching tasks * on a single Executor (created by the LocalBackend) running locally. */ -private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, toUseCores: Int = Int.MaxValue) +private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int) extends SchedulerBackend with ExecutorBackend { - val totalCores = LocalBackend.getMaxCores( - scheduler.sc.conf.getOption("spark.cores.max").map(_.toInt), toUseCores) - var localActor: ActorRef = null override def start() { @@ -110,22 +107,3 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, toUseCores: Int localActor ! StatusUpdate(taskId, state, serializedData) } } - -private[spark] object LocalBackend { - private def getMaxCores(confMaxCores: Option[Int], toUseCores: Int) : Int = { - if (toUseCores != Int.MaxValue) { - // This indicates that if user explicitly pass in the desired cores, - // we should use it instead of default cores from spark.cores.max - toUseCores - } else { - val retCores = confMaxCores.getOrElse(toUseCores) - val realCores = Runtime.getRuntime.availableProcessors() - if (retCores > realCores) { - // If spark.cores.max specified cores is larger than logical cores, use logical cores - realCores - } else { - retCores - } - } - } -}