From aa63161c0e5ee535b220dbfbb07997ff4c4f0722 Mon Sep 17 00:00:00 2001 From: Zhen Peng Date: Mon, 26 May 2014 21:15:21 +0800 Subject: [PATCH 1/2] SPARK-1929 DAGScheduler suspended by local task OOM --- .../org/apache/spark/scheduler/DAGScheduler.scala | 9 ++++++++- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 14 ++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ff411e24a3d85..c9f3058167de5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.io.NotSerializableException +import java.io.{NotSerializableException, PrintWriter, StringWriter} import java.util.Properties import java.util.concurrent.atomic.AtomicInteger @@ -580,6 +580,13 @@ class DAGScheduler( case e: Exception => jobResult = JobFailed(e) job.listener.jobFailed(e) + case oom: OutOfMemoryError => + val errors: StringWriter = new StringWriter() + oom.printStackTrace(new PrintWriter(errors)) + val exception = new SparkException("job failed for: " + oom.getMessage() + + "\n" + errors.toString()) + jobResult = JobFailed(exception) + job.listener.jobFailed(exception) } finally { val s = job.finalStage stageIdToJobIds -= s.id // clean up data structures that were populated for a local job, diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d172dd1ac8e1b..81e64c1846ed5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -256,6 +256,20 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assertDataStructuresEmpty } + test("local job oom") { + val rdd = new MyRDD(sc, Nil) { + override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = + throw new java.lang.OutOfMemoryError("test local job oom") + override def getPartitions = Array( new Partition { override def index = 0 } ) + override def getPreferredLocations(split: Partition) = Nil + override def toString = "DAGSchedulerSuite Local RDD" + } + val jobId = scheduler.nextJobId.getAndIncrement() + runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener)) + assert(results.size == 0) + assertDataStructuresEmpty + } + test("run trivial job w/ dependency") { val baseRdd = makeRdd(1, Nil) val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd))) From 76f7eda28b931579042ba08bb4ef1ca6ce60494a Mon Sep 17 00:00:00 2001 From: Zhen Peng Date: Tue, 27 May 2014 11:43:49 +0800 Subject: [PATCH 2/2] remove redundant memory allocations --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c9f3058167de5..c70aa0e6e4523 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -581,10 +581,7 @@ class DAGScheduler( jobResult = JobFailed(e) job.listener.jobFailed(e) case oom: OutOfMemoryError => - val errors: StringWriter = new StringWriter() - oom.printStackTrace(new PrintWriter(errors)) - val exception = new SparkException("job failed for: " + oom.getMessage() + - "\n" + errors.toString()) + val exception = new SparkException("job failed for Out of memory exception", oom) jobResult = JobFailed(exception) job.listener.jobFailed(exception) } finally {