From fbb53edaa5dbe429167995286762fa7b907c0e0a Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 29 Mar 2018 12:50:16 +0800 Subject: [PATCH 1/8] [SPARK-23811][Core] Same tasks' FetchFailed event comes before Success will cause child stage never succeed --- .../spark/scheduler/TaskSetManager.scala | 13 +++ .../spark/scheduler/TaskSetManagerSuite.scala | 92 ++++++++++++++++++- 2 files changed, 104 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index d958658527f6d..7f453a77e364b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -794,6 +794,19 @@ private[spark] class TaskSetManager( fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) } + // Kill any other attempts for this FetchFailed task + for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { + logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + + s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + + s"as the attempt ${info.attemptNumber} failed because FetchFailed") + killedByOtherAttempt(index) = true + sched.backend.killTask( + attemptInfo.taskId, + attemptInfo.executorId, + interruptThread = true, + reason = "another attempt fetch failed") + } + None case ef: ExceptionFailure => diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ca6a7e5db3b17..9aa34b65e22f6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -745,7 +745,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(resubmittedTasks === 0) } - test("[SPARK-22074] Task killed by other attempt task should not be resubmitted") { val conf = new SparkConf().set("spark.speculation", "true") sc = new SparkContext("local", "test", conf) @@ -853,6 +852,97 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(resubmittedTasks === 0) } + test("Fetch failed task should not have success completion event") { + sc = new SparkContext("local", "test") + // Set the speculation multiplier to be 0 so speculative tasks are launched immediately + sc.conf.set("spark.speculation.multiplier", "0.0") + sc.conf.set("spark.speculation", "true") + + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + var killTaskCalled = false + sched.initialize(new FakeSchedulerBackend() { + override def killTask(taskId: Long, + executorId: String, + interruptThread: Boolean, + reason: String): Unit = { + // Check the only one killTask event in this case, which triggered by + // task 3.1 fetch failed. + assert(taskId === 3) + assert(executorId === "exec2") + assert(interruptThread) + assert(reason === "another attempt fetch failed") + killTaskCalled = true + } + }) + + // Keep track of the original tasks will not has SUCCESS event. + var originTaskSuccess = false + val dagScheduler = new FakeDAGScheduler(sc, sched) { + override def taskEnded(task: Task[_], + reason: TaskEndReason, + result: Any, + accumUpdates: Seq[AccumulatorV2[_, _]], + taskInfo: TaskInfo): Unit = { + super.taskEnded(task, reason, result, accumUpdates, taskInfo) + reason match { + case Success if taskInfo.taskId == 3 && + taskInfo.attemptNumber == 0 && taskInfo.index == 3 => + originTaskSuccess = true + case _ => + } + } + } + sched.dagScheduler.stop() + sched.setDAGScheduler(dagScheduler) + + val taskSet = FakeTask.createTaskSet(4) + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums + } + // Offer resources for 4 tasks to start + for ((k, v) <- List( + "exec1" -> "host1", + "exec1" -> "host1", + "exec2" -> "host2", + "exec2" -> "host2")) { + val taskOption = manager.resourceOffer(k, v, NO_PREF) + assert(taskOption.isDefined) + val task = taskOption.get + assert(task.executorId === k) + } + assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) + clock.advance(1) + // Complete the 3 tasks and leave 1 task in running + for (id <- Set(0, 1, 2)) { + manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) + assert(sched.endedTasks(id) === Success) + } + + // checkSpeculatableTasks checks that the task runtime is greater than the threshold for + // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for + // > 0ms, so advance the clock by 1ms here. + clock.advance(1) + assert(manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.toSet === Set(3)) + + // Offer resource to start the speculative attempt for the running task + val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) + assert(taskOption5.isDefined) + val task5 = taskOption5.get + assert(task5.index === 3) + assert(task5.taskId === 4) + assert(task5.executorId === "exec1") + assert(task5.attemptNumber === 1) + // sched.backend = mock(classOf[SchedulerBackend]) + // The speculative task raise a FetchFailed + manager.handleFailedTask(4, TaskState.FAILED, + FetchFailed(BlockManagerId("exec1", "host1", 12345), 0, 0, 0, "ignored")) + // Check the original task killed by FetchFailed. + assert(!originTaskSuccess && killTaskCalled) + } + test("speculative and noPref task should be scheduled after node-local") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler( From cda726875d6fdddc02fa6d885507d7695a75510f Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 2 Apr 2018 15:25:07 +0800 Subject: [PATCH 2/8] Ignore success event for fetching failed task --- .../org/apache/spark/scheduler/TaskSetManager.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 7f453a77e364b..5b7509b5ff12a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -82,11 +82,13 @@ private[spark] class TaskSetManager( val successful = new Array[Boolean](numTasks) private val numFailures = new Array[Int](numTasks) - // Set the coresponding index of Boolean var when the task killed by other attempt tasks, + // Set the corresponding index of Boolean var when the task killed by other attempt tasks, // this happened while we set the `spark.speculation` to true. The task killed by others // should not resubmit while executor lost. private val killedByOtherAttempt: Array[Boolean] = new Array[Boolean](numTasks) + private val fetchFailedTaskIndexSet = new HashSet[Int] + val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) private[scheduler] var tasksSuccessful = 0 @@ -750,6 +752,10 @@ private[spark] class TaskSetManager( if (tasksSuccessful == numTasks) { isZombie = true } + } else if (fetchFailedTaskIndexSet.contains(index)) { + logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + + " because task " + index + " has already failed by FetchFailed") + return } else { logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") @@ -793,7 +799,7 @@ private[spark] class TaskSetManager( blacklistTracker.foreach(_.updateBlacklistForFetchFailure( fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) } - + fetchFailedTaskIndexSet.add(index) // Kill any other attempts for this FetchFailed task for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + From 0defc09dbcbd0b227eab583d0426b5dc78232b37 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 16 Apr 2018 17:52:58 +0800 Subject: [PATCH 3/8] Delete useless code --- .../spark/scheduler/TaskSetManager.scala | 12 --- .../spark/scheduler/TaskSetManagerSuite.scala | 91 ------------------- 2 files changed, 103 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 5b7509b5ff12a..49bf60aa144f2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -800,18 +800,6 @@ private[spark] class TaskSetManager( fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) } fetchFailedTaskIndexSet.add(index) - // Kill any other attempts for this FetchFailed task - for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { - logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + - s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + - s"as the attempt ${info.attemptNumber} failed because FetchFailed") - killedByOtherAttempt(index) = true - sched.backend.killTask( - attemptInfo.taskId, - attemptInfo.executorId, - interruptThread = true, - reason = "another attempt fetch failed") - } None diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 9aa34b65e22f6..6013e18026958 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -852,97 +852,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(resubmittedTasks === 0) } - test("Fetch failed task should not have success completion event") { - sc = new SparkContext("local", "test") - // Set the speculation multiplier to be 0 so speculative tasks are launched immediately - sc.conf.set("spark.speculation.multiplier", "0.0") - sc.conf.set("spark.speculation", "true") - - sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) - var killTaskCalled = false - sched.initialize(new FakeSchedulerBackend() { - override def killTask(taskId: Long, - executorId: String, - interruptThread: Boolean, - reason: String): Unit = { - // Check the only one killTask event in this case, which triggered by - // task 3.1 fetch failed. - assert(taskId === 3) - assert(executorId === "exec2") - assert(interruptThread) - assert(reason === "another attempt fetch failed") - killTaskCalled = true - } - }) - - // Keep track of the original tasks will not has SUCCESS event. - var originTaskSuccess = false - val dagScheduler = new FakeDAGScheduler(sc, sched) { - override def taskEnded(task: Task[_], - reason: TaskEndReason, - result: Any, - accumUpdates: Seq[AccumulatorV2[_, _]], - taskInfo: TaskInfo): Unit = { - super.taskEnded(task, reason, result, accumUpdates, taskInfo) - reason match { - case Success if taskInfo.taskId == 3 && - taskInfo.attemptNumber == 0 && taskInfo.index == 3 => - originTaskSuccess = true - case _ => - } - } - } - sched.dagScheduler.stop() - sched.setDAGScheduler(dagScheduler) - - val taskSet = FakeTask.createTaskSet(4) - val clock = new ManualClock() - val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) - val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => - task.metrics.internalAccums - } - // Offer resources for 4 tasks to start - for ((k, v) <- List( - "exec1" -> "host1", - "exec1" -> "host1", - "exec2" -> "host2", - "exec2" -> "host2")) { - val taskOption = manager.resourceOffer(k, v, NO_PREF) - assert(taskOption.isDefined) - val task = taskOption.get - assert(task.executorId === k) - } - assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) - clock.advance(1) - // Complete the 3 tasks and leave 1 task in running - for (id <- Set(0, 1, 2)) { - manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) - assert(sched.endedTasks(id) === Success) - } - - // checkSpeculatableTasks checks that the task runtime is greater than the threshold for - // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for - // > 0ms, so advance the clock by 1ms here. - clock.advance(1) - assert(manager.checkSpeculatableTasks(0)) - assert(sched.speculativeTasks.toSet === Set(3)) - - // Offer resource to start the speculative attempt for the running task - val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) - assert(taskOption5.isDefined) - val task5 = taskOption5.get - assert(task5.index === 3) - assert(task5.taskId === 4) - assert(task5.executorId === "exec1") - assert(task5.attemptNumber === 1) - // sched.backend = mock(classOf[SchedulerBackend]) - // The speculative task raise a FetchFailed - manager.handleFailedTask(4, TaskState.FAILED, - FetchFailed(BlockManagerId("exec1", "host1", 12345), 0, 0, 0, "ignored")) - // Check the original task killed by FetchFailed. - assert(!originTaskSuccess && killTaskCalled) - } - test("speculative and noPref task should be scheduled after node-local") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler( From df1768d6b0138a41476a8a243a2fcf24a29dfbfa Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 18 Apr 2018 10:49:25 +0800 Subject: [PATCH 4/8] handle this case in DAGScheduler --- .../apache/spark/scheduler/DAGScheduler.scala | 3 ++ .../spark/scheduler/DAGSchedulerSuite.scala | 44 +++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8c46a84323392..2a143fb610355 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1266,6 +1266,9 @@ class DAGScheduler( } if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") + } else if (failedStages.contains(shuffleStage)) { + logInfo(s"Ignoring task $smt because of stage $shuffleStage have " + + s"been marked as failed") } else { // The epoch of the task is acceptable (i.e., the task was launched after the most // recent failure we're aware of for the executor), so mark the task's output as diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d812b5bd92c1b..08e37c933fa83 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2399,6 +2399,50 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("[SPARK-23811] FetchFailed comes before Success of same task will cause child stage" + + " never succeed") { + // Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC + val rddA = new MyRDD(sc, 2, Nil) + val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) + val shuffleIdA = shuffleDepA.shuffleId + + val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) + val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + + val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + + submit(rddC, Array(0, 1)) + + // Complete both tasks in rddA. + assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)))) + + // The first task success + runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + + // The second task's speculative attempt fails first, but task self still running. + // This may caused by ExecutorLost. + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) + // Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + // The second result task self success soon. + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) + // Missing partition number should not change, otherwise it will cause child stage + // never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. From ba6f71a0fc49ce2a07addec3496177c4b2b43fef Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 18 Apr 2018 10:51:45 +0800 Subject: [PATCH 5/8] delete the changes in TaskSetManager --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 7 ------- 1 file changed, 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 49bf60aa144f2..1ba9273887b8f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -87,8 +87,6 @@ private[spark] class TaskSetManager( // should not resubmit while executor lost. private val killedByOtherAttempt: Array[Boolean] = new Array[Boolean](numTasks) - private val fetchFailedTaskIndexSet = new HashSet[Int] - val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) private[scheduler] var tasksSuccessful = 0 @@ -752,10 +750,6 @@ private[spark] class TaskSetManager( if (tasksSuccessful == numTasks) { isZombie = true } - } else if (fetchFailedTaskIndexSet.contains(index)) { - logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + - " because task " + index + " has already failed by FetchFailed") - return } else { logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") @@ -799,7 +793,6 @@ private[spark] class TaskSetManager( blacklistTracker.foreach(_.updateBlacklistForFetchFailure( fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) } - fetchFailedTaskIndexSet.add(index) None From a201764c94b21e294f0a32cb71019b422e8d8090 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 25 Apr 2018 23:29:00 +0800 Subject: [PATCH 6/8] Add ut for simulating ResultTask with same scenario --- .../spark/scheduler/DAGSchedulerSuite.scala | 36 +++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 08e37c933fa83..e0157c9e8c32f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2403,8 +2403,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi * This tests the case where origin task success after speculative task got FetchFailed * before. */ - test("[SPARK-23811] FetchFailed comes before Success of same task will cause child stage" + - " never succeed") { + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following" + + "successful tasks") { // Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC val rddA = new MyRDD(sc, 2, Nil) val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) @@ -2443,6 +2443,38 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) } + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following" + + "successful tasks") { + val rddA = new MyRDD(sc, 2, Nil) + val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) + val shuffleIdA = shuffleDepA.shuffleId + val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) + submit(rddB, Array(0, 1)) + + // Complete both tasks in rddA. + assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)))) + + // The first task of rddB success + assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) + runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + + // The second task's speculative attempt fails first, but task self still running. + // This may caused by ExecutorLost. + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) + // The second result task self success soon. + assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) + assertDataStructuresEmpty() + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. From 7f8503f7f921568a09b967ddf75f2ce2f027e197 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 25 Apr 2018 23:40:00 +0800 Subject: [PATCH 7/8] Add more check --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index e0157c9e8c32f..0615fcca23568 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2403,7 +2403,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi * This tests the case where origin task success after speculative task got FetchFailed * before. */ - test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following" + + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + "successful tasks") { // Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC val rddA = new MyRDD(sc, 2, Nil) @@ -2443,7 +2443,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) } - test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following" + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + "successful tasks") { val rddA = new MyRDD(sc, 2, Nil) val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) @@ -2468,6 +2468,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi taskSets(1).tasks(1), FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), null)) + // Make sure failedStage is not empty now + assert(scheduler.failedStages.nonEmpty) // The second result task self success soon. assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) runEvent(makeCompletionEvent( From fee903c65c59219cdc1c0937ac8be4777142ffbd Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 26 Apr 2018 13:53:45 +0800 Subject: [PATCH 8/8] Fix this at the beggening of handleTaskCompletion --- .../apache/spark/scheduler/DAGScheduler.scala | 19 ++++++++++++------- .../spark/scheduler/DAGSchedulerSuite.scala | 7 +++++++ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 2a143fb610355..e09300df7bea0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1178,19 +1178,27 @@ class DAGScheduler( event.taskInfo.attemptNumber, // this is a task attempt number event.reason) - if (!stageIdToStage.contains(task.stageId)) { - // The stage may have already finished when we get this event -- eg. maybe it was a + val stageOpt = stageIdToStage.get(task.stageId) + + if (stageOpt.isEmpty || (failedStages.contains(stageOpt.get) && event.reason == Success)) { + // The stage may have already finished or failed when we get this event -- eg. maybe it was a // speculative task. It is important that we send the TaskEnd event in any case, so listeners // are properly notified and can chose to handle it. For instance, some listeners are // doing their own accounting and if they don't get the task end event they think // tasks are still running when they really aren't. + val msg = if (stageOpt.isEmpty) { + "have already finished" + } else { + s"${stageOpt.get} have been marked as failed" + } + logWarning(s"Ignoring task $task because of stage $msg") postTaskEnd(event) - // Skip all the actions if the stage has been cancelled. + // Skip all the actions if the stage has been cancelled or failed. return } - val stage = stageIdToStage(task.stageId) + val stage = stageOpt.get // Make sure the task's accumulators are updated before any other processing happens, so that // we can post a task end event before any jobs or stages are updated. The accumulators are @@ -1266,9 +1274,6 @@ class DAGScheduler( } if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") - } else if (failedStages.contains(shuffleStage)) { - logInfo(s"Ignoring task $smt because of stage $shuffleStage have " + - s"been marked as failed") } else { // The epoch of the task is acceptable (i.e., the task was launched after the most // recent failure we're aware of for the executor), so mark the task's output as diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 0615fcca23568..c66c03bc9e786 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2472,6 +2472,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(scheduler.failedStages.nonEmpty) // The second result task self success soon. assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) + // This task success event will be ignored by DAGScheduler + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) + // Resubmit failed stage and success finally + scheduler.resubmitFailedStages() + runEvent(makeCompletionEvent( + taskSets(0).tasks(0), Success, makeMapStatus("hostB", 2))) runEvent(makeCompletionEvent( taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) assertDataStructuresEmpty()