Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
641936f
Make sure mapStage.pendingtasks is set() while MapStage.isAvailable i…
suyanNone Jan 15, 2015
741ab4d
Add sub-class canEqual to make ShuffleMapTask not equal Result Task w…
suyanNone Jan 26, 2015
5ec8a82
Refine
suyanNone Jan 29, 2015
9044720
add parameter type
suyanNone Jan 30, 2015
03db624
Refine
suyanNone Jan 30, 2015
8dfdc18
Refine log message use string interpolation
suyanNone Feb 27, 2015
c286f7a
Refine with the latest spark
suyanNone Feb 27, 2015
dcf1533
Refine solution and fix bug in code
suyanNone Jun 18, 2015
27da8e7
Refine codeStyle
suyanNone Jun 18, 2015
bd5fec4
Refine Tests
suyanNone Jun 19, 2015
4dbe4d3
Refine assert exception
suyanNone Jun 19, 2015
9dfff63
Refine the test 'ignore late map task completions'
suyanNone Jun 19, 2015
e1e0b66
Refine the testcase
suyanNone Jul 3, 2015
2379250
Refine scala style
suyanNone Jul 3, 2015
314873a
Only tracker partitionId to instead of Task
suyanNone Jul 21, 2015
7ae128e
Refine test name
suyanNone Jul 21, 2015
b2df3fd
refine suite code
suyanNone Jul 21, 2015
9f5276f
Merge branch 'master' into SPARK-5259
squito Jul 27, 2015
e4aa266
minor style updates
squito Jul 27, 2015
a7d0dff
Merge branch 'master' into SPARK-5259
squito Aug 21, 2015
1519aec
better test name
squito Aug 21, 2015
51f3c47
dont use Thread.sleep
squito Aug 21, 2015
c010a44
partitions.length
squito Aug 25, 2015
920fcca
formatting of CompletionEvent
squito Aug 25, 2015
52545d8
add comment summarizing test case
squito Aug 25, 2015
19b3c4c
Merge branch 'master' into SPARK-5259
squito Aug 28, 2015
752d407
test case for task resubmission after an executor is lost
squito Aug 30, 2015
f986d63
Merge branch 'master' into SPARK-5259
squito Sep 21, 2015
5f43546
partitions.length
squito Sep 21, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ class DAGScheduler(
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
stage.pendingTasks.clear()
stage.pendingPartitions.clear()

// First figure out the indexes of partition ids to compute.
val (allPartitions: Seq[Int], partitionsToCompute: Seq[Int]) = {
Expand Down Expand Up @@ -1060,8 +1060,8 @@ class DAGScheduler(

if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingTasks ++= tasks
logDebug("New pending tasks: " + stage.pendingTasks)
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
Expand Down Expand Up @@ -1152,7 +1152,7 @@ class DAGScheduler(
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, event.taskMetrics))
stage.pendingTasks -= task
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
// Cast to ResultStage here because it's part of the ResultTask
Expand Down Expand Up @@ -1198,7 +1198,7 @@ class DAGScheduler(
shuffleStage.addOutputLoc(smt.partitionId, status)
}

if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
markStageAsFinished(shuffleStage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
Expand Down Expand Up @@ -1242,7 +1242,7 @@ class DAGScheduler(

case Resubmitted =>
logInfo("Resubmitted " + task + ", so marking it as still running")
stage.pendingTasks += task
stage.pendingPartitions += task.partitionId

case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
val failedStage = stageIdToStage(task.stageId)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private[scheduler] abstract class Stage(
/** Set of jobs that this stage belongs to. */
val jobIds = new HashSet[Int]

var pendingTasks = new HashSet[Task[_]]
val pendingPartitions = new HashSet[Int]

/** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,8 @@ private[spark] class TaskSetManager(
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format(
taskName, taskId, host, taskLocality, serializedTask.limit))
logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
s"$taskLocality, ${serializedTask.limit} bytes)")

sched.dagScheduler.taskStarted(task, info)
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
Expand Down
197 changes: 182 additions & 15 deletions core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,8 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
(Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
// the 2nd ResultTask failed
complete(taskSets(1), Seq(
(Success, 42),
Expand All @@ -490,7 +490,7 @@ class DAGSchedulerSuite
// ask the scheduler to try it again
scheduler.resubmitFailedStages()
// have the 2nd attempt pass
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.length))))
// we can see both result blocks now
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet ===
HashSet("hostA", "hostB"))
Expand Down Expand Up @@ -782,8 +782,8 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
(Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
// The MapOutputTracker should know about both map output locations.
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet ===
HashSet("hostA", "hostB"))
Expand Down Expand Up @@ -1035,6 +1035,173 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}

/**
* This test runs a three stage job, with a fetch failure in stage 1. but during the retry, we
* have completions from both the first & second attempt of stage 1. So all the map output is
* available before we finish any task set for stage 1. We want to make sure that we don't
* submit stage 2 until the map output for stage 1 is registered
*/
test("don't submit stage until its dependencies map outputs are registered (SPARK-5259)") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment at the top of the test that summarizes the sequence of events

val firstRDD = new MyRDD(sc, 3, Nil)
val firstShuffleDep = new ShuffleDependency(firstRDD, null)
val firstShuffleId = firstShuffleDep.shuffleId
val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))

// things start out smoothly, stage 0 completes with no issues
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
(Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
))

// then one executor dies, and a task fails in stage 1
runEvent(ExecutorLost("exec-hostA"))
runEvent(CompletionEvent(
taskSets(1).tasks(0),
FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"),
null,
null,
createFakeTaskInfo(),
null))

// so we resubmit stage 0, which completes happily
scheduler.resubmitFailedStages()
val stage0Resubmit = taskSets(2)
assert(stage0Resubmit.stageId == 0)
assert(stage0Resubmit.stageAttemptId === 1)
val task = stage0Resubmit.tasks(0)
assert(task.partitionId === 2)
runEvent(CompletionEvent(
task,
Success,
makeMapStatus("hostC", shuffleMapRdd.partitions.length),
null,
createFakeTaskInfo(),
null))

// now here is where things get tricky : we will now have a task set representing
// the second attempt for stage 1, but we *also* have some tasks for the first attempt for
// stage 1 still going
val stage1Resubmit = taskSets(3)
assert(stage1Resubmit.stageId == 1)
assert(stage1Resubmit.stageAttemptId === 1)
assert(stage1Resubmit.tasks.length === 3)

// we'll have some tasks finish from the first attempt, and some finish from the second attempt,
// so that we actually have all stage outputs, though no attempt has completed all its
// tasks
runEvent(CompletionEvent(
taskSets(3).tasks(0),
Success,
makeMapStatus("hostC", reduceRdd.partitions.length),
null,
createFakeTaskInfo(),
null))
runEvent(CompletionEvent(
taskSets(3).tasks(1),
Success,
makeMapStatus("hostC", reduceRdd.partitions.length),
null,
createFakeTaskInfo(),
null))
// late task finish from the first attempt
runEvent(CompletionEvent(
taskSets(1).tasks(2),
Success,
makeMapStatus("hostB", reduceRdd.partitions.length),
null,
createFakeTaskInfo(),
null))

// What should happen now is that we submit stage 2. However, we might not see an error
// b/c of DAGScheduler's error handling (it tends to swallow errors and just log them). But
// we can check some conditions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing this, why not add a mechanism to grab the errors during tests? Doesn't have to be in this PR but it would make more sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Note that the really important thing here is not so much that we submit stage 2 *immediately*
// but that we don't end up with some error from these interleaved completions. It would also
// be OK (though sub-optimal) if stage 2 simply waited until the resubmission of stage 1 had
// all its tasks complete

// check that we have all the map output for stage 0 (it should have been there even before
// the last round of completions from stage 1, but just to double check it hasn't been messed
// up) and also the newly available stage 1
val stageToReduceIdxs = Seq(
0 -> (0 until 3),
1 -> (0 until 1)
)
for {
(stage, reduceIdxs) <- stageToReduceIdxs
reduceIdx <- reduceIdxs
} {
// this would throw an exception if the map status hadn't been registered
val statuses = mapOutputTracker.getMapSizesByExecutorId(stage, reduceIdx)
// really we should have already thrown an exception rather than fail either of these
// asserts, but just to be extra defensive let's double check the statuses are OK
assert(statuses != null)
assert(statuses.nonEmpty)
}

// and check that stage 2 has been submitted
assert(taskSets.size == 5)
val stage2TaskSet = taskSets(4)
assert(stage2TaskSet.stageId == 2)
assert(stage2TaskSet.stageAttemptId == 0)
}

/**
* We lose an executor after completing some shuffle map tasks on it. Those tasks get
* resubmitted, and when they finish the job completes normally
*/
test("register map outputs correctly after ExecutorLost and task Resubmitted") {
val firstRDD = new MyRDD(sc, 3, Nil)
val firstShuffleDep = new ShuffleDependency(firstRDD, null)
val reduceRdd = new MyRDD(sc, 5, List(firstShuffleDep))
submit(reduceRdd, Array(0))

// complete some of the tasks from the first stage, on one host
runEvent(CompletionEvent(
taskSets(0).tasks(0), Success,
makeMapStatus("hostA", reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(
taskSets(0).tasks(1), Success,
makeMapStatus("hostA", reduceRdd.partitions.length), null, createFakeTaskInfo(), null))

// now that host goes down
runEvent(ExecutorLost("exec-hostA"))

// so we resubmit those tasks
runEvent(CompletionEvent(
taskSets(0).tasks(0), Resubmitted, null, null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(
taskSets(0).tasks(1), Resubmitted, null, null, createFakeTaskInfo(), null))

// now complete everything on a different host
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostB", reduceRdd.partitions.length)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.length)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.length))
))

// now we should submit stage 1, and the map output from stage 0 should be registered

// check that we have all the map output for stage 0
(0 until reduceRdd.partitions.length).foreach { reduceIdx =>
val statuses = mapOutputTracker.getMapSizesByExecutorId(0, reduceIdx)
// really we should have already thrown an exception rather than fail either of these
// asserts, but just to be extra defensive let's double check the statuses are OK
assert(statuses != null)
assert(statuses.nonEmpty)
}

// and check that stage 1 has been submitted
assert(taskSets.size == 2)
val stage1TaskSet = taskSets(1)
assert(stage1TaskSet.stageId == 1)
assert(stage1TaskSet.stageAttemptId == 0)
}

/**
* Makes sure that failures of stage used by multiple jobs are correctly handled.
*
Expand Down Expand Up @@ -1393,8 +1560,8 @@ class DAGSchedulerSuite
// Submit a map stage by itself
submitMapStage(shuffleDep)
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
(Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
assert(results.size === 1)
results.clear()
assertDataStructuresEmpty()
Expand All @@ -1407,7 +1574,7 @@ class DAGSchedulerSuite
// Ask the scheduler to try it again; TaskSet 2 will rerun the map task that we couldn't fetch
// from, then TaskSet 3 will run the reduce stage
scheduler.resubmitFailedStages()
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.length))))
complete(taskSets(3), Seq((Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
results.clear()
Expand Down Expand Up @@ -1452,33 +1619,33 @@ class DAGSchedulerSuite
// Complete the first stage
assert(taskSets(0).stageId === 0)
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", rdd1.partitions.size)),
(Success, makeMapStatus("hostB", rdd1.partitions.size))))
(Success, makeMapStatus("hostA", rdd1.partitions.length)),
(Success, makeMapStatus("hostB", rdd1.partitions.length))))
assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
assert(listener1.results.size === 1)

// When attempting the second stage, show a fetch failure
assert(taskSets(1).stageId === 1)
complete(taskSets(1), Seq(
(Success, makeMapStatus("hostA", rdd2.partitions.size)),
(Success, makeMapStatus("hostA", rdd2.partitions.length)),
(FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null)))
scheduler.resubmitFailedStages()
assert(listener2.results.size === 0) // Second stage listener should not have a result yet

// Stage 0 should now be running as task set 2; make its task succeed
assert(taskSets(2).stageId === 0)
complete(taskSets(2), Seq(
(Success, makeMapStatus("hostC", rdd2.partitions.size))))
(Success, makeMapStatus("hostC", rdd2.partitions.length))))
assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
assert(listener2.results.size === 0) // Second stage listener should still not have a result

// Stage 1 should now be running as task set 3; make its first task succeed
assert(taskSets(3).stageId === 1)
complete(taskSets(3), Seq(
(Success, makeMapStatus("hostB", rdd2.partitions.size)),
(Success, makeMapStatus("hostD", rdd2.partitions.size))))
(Success, makeMapStatus("hostB", rdd2.partitions.length)),
(Success, makeMapStatus("hostD", rdd2.partitions.length))))
assert(mapOutputTracker.getMapSizesByExecutorId(dep2.shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostD")))
assert(listener2.results.size === 1)
Expand All @@ -1494,7 +1661,7 @@ class DAGSchedulerSuite
// TaskSet 5 will rerun stage 1's lost task, then TaskSet 6 will rerun stage 2
assert(taskSets(5).stageId === 1)
complete(taskSets(5), Seq(
(Success, makeMapStatus("hostE", rdd2.partitions.size))))
(Success, makeMapStatus("hostE", rdd2.partitions.length))))
complete(taskSets(6), Seq(
(Success, 53)))
assert(listener3.results === Map(0 -> 52, 1 -> 53))
Expand Down