From 34cc4dc035f7262678b6561700511e81fe8a368a Mon Sep 17 00:00:00 2001 From: Ajith Date: Wed, 7 Mar 2018 19:07:50 +0530 Subject: [PATCH] DAGScheduler blocked due to JobSubmitted event --- .../apache/spark/scheduler/DAGScheduler.scala | 54 +++++++++++-------- .../spark/scheduler/DAGSchedulerEvent.scala | 3 +- .../spark/scheduler/DAGSchedulerSuite.scala | 17 ++++-- 3 files changed, 47 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8c46a84323392..156e058cfb9c3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.annotation.tailrec import scala.collection.Map +import scala.collection.concurrent.TrieMap import scala.collection.mutable.{ArrayStack, HashMap, HashSet} import scala.concurrent.duration._ import scala.language.existentials @@ -140,8 +141,8 @@ class DAGScheduler( private[scheduler] def numTotalJobs: Int = nextJobId.get() private val nextStageId = new AtomicInteger(0) - private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] - private[scheduler] val stageIdToStage = new HashMap[Int, Stage] + private[scheduler] val jobIdToStageIds = new TrieMap[Int, HashSet[Int]] + private[scheduler] val stageIdToStage = new TrieMap[Int, Stage] /** * Mapping from shuffle dependency ID to the ShuffleMapStage that will generate the data for * that dependency. Only includes stages that are part of currently running job (when the job(s) @@ -370,7 +371,7 @@ class DAGScheduler( /** * Create a ResultStage associated with the provided jobId. */ - private def createResultStage( + private[scheduler] def createResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], @@ -600,9 +601,20 @@ class DAGScheduler( assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] - val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) + val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler) + var finalStage: ResultStage = null + try { + // New stage creation may throw an exception if, for example, jobs are run on a + // HadoopRDD whose underlying HDFS files have been deleted. + finalStage = createResultStage(rdd, func2, partitions.toArray, jobId, callSite) + } catch { + case e: Exception => + logWarning("Creating new stage failed due to exception - job: " + jobId, e) + waiter.jobFailed(e) + return waiter + } eventProcessLoop.post(JobSubmitted( - jobId, rdd, func2, partitions.toArray, callSite, waiter, + jobId, finalStage, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter } @@ -667,8 +679,19 @@ class DAGScheduler( val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val partitions = (0 until rdd.partitions.length).toArray val jobId = nextJobId.getAndIncrement() + var finalStage: ResultStage = null + try { + // New stage creation may throw an exception if, for example, jobs are run on a + // HadoopRDD whose underlying HDFS files have been deleted. + finalStage = createResultStage(rdd, func2, partitions, jobId, callSite) + } catch { + case e: Exception => + logWarning("Creating new stage failed due to exception - job: " + jobId, e) + listener.jobFailed(e) + return listener.awaitResult() + } eventProcessLoop.post(JobSubmitted( - jobId, rdd, func2, partitions, callSite, listener, SerializationUtils.clone(properties))) + jobId, finalStage, partitions, callSite, listener, SerializationUtils.clone(properties))) listener.awaitResult() // Will throw an exception if the job fails } @@ -854,24 +877,11 @@ class DAGScheduler( } private[scheduler] def handleJobSubmitted(jobId: Int, - finalRDD: RDD[_], - func: (TaskContext, Iterator[_]) => _, + finalStage: ResultStage, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { - var finalStage: ResultStage = null - try { - // New stage creation may throw an exception if, for example, jobs are run on a - // HadoopRDD whose underlying HDFS files have been deleted. - finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) - } catch { - case e: Exception => - logWarning("Creating new stage failed due to exception - job: " + jobId, e) - listener.jobFailed(e) - return - } - val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( @@ -1773,8 +1783,8 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler } private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { - case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => - dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) + case JobSubmitted(jobId, finalStage, partitions, callSite, listener, properties) => + dagScheduler.handleJobSubmitted(jobId, finalStage, partitions, callSite, listener, properties) case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 54ab8f8b3e1d8..feccaf0f8e4f8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -36,8 +36,7 @@ private[scheduler] sealed trait DAGSchedulerEvent /** A result-yielding job was submitted on a target RDD */ private[scheduler] case class JobSubmitted( jobId: Int, - finalRDD: RDD[_], - func: (TaskContext, Iterator[_]) => _, + finalStage: ResultStage, partitions: Array[Int], callSite: CallSite, listener: JobListener, diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d812b5bd92c1b..402aeffb6bead 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -313,9 +313,20 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi partitions: Array[Int], func: (TaskContext, Iterator[_]) => _ = jobComputeFunc, listener: JobListener = jobListener, - properties: Properties = null): Int = { + properties: Properties = null, + scheduler: DAGScheduler = scheduler): Int = { val jobId = scheduler.nextJobId.getAndIncrement() - runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener, properties)) + var finalStage: ResultStage = null + val callSite = CallSite("", "") + try { + finalStage = scheduler.createResultStage(rdd, func, partitions, jobId, callSite) + } catch { + case e: Exception => + logWarning("Creating new stage failed due to exception - job: " + jobId, e) + listener.jobFailed(e) + return jobId + } + runEvent(JobSubmitted(jobId, finalStage, partitions, callSite, listener, properties)) jobId } @@ -647,7 +658,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi blockManagerMaster, sc.env) dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(noKillScheduler) - val jobId = submit(new MyRDD(sc, 1, Nil), Array(0)) + val jobId = submit(new MyRDD(sc, 1, Nil), Array(0), scheduler = noKillScheduler) cancel(jobId) // Because the job wasn't actually cancelled, we shouldn't have received a failure message. assert(failure === null)