From 9301803d018b3ba5195359232bf015dea5635759 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Fri, 18 Apr 2025 00:24:35 -0500 Subject: [PATCH 01/11] Option sketch 2 - fail during submitMissingTasks --- .../apache/spark/scheduler/DAGScheduler.scala | 132 +++++++++------ .../spark/scheduler/DAGSchedulerSuite.scala | 160 +++++++++++++++--- 2 files changed, 213 insertions(+), 79 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index aee92ba928b4a..96b6433931563 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1552,6 +1552,13 @@ private[spark] class DAGScheduler( // `findMissingPartitions()` returns all partitions every time. stage match { case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable => + // already executed atleast once + if (sms.getNextAttemptId > 0) { + val (allJobsAborted, rollingBackStages) = abortIndeterminateStageChildren(sms) + if (allJobsAborted) { + return + } + } mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId) sms.shuffleDep.newShuffleMergeState() case _ => @@ -2129,60 +2136,7 @@ private[spark] class DAGScheduler( // guaranteed to be determinate, so the input data of the reducers will not change // even if the map tasks are re-tried. if (mapStage.isIndeterminate) { - // It's a little tricky to find all the succeeding stages of `mapStage`, because - // each stage only know its parents not children. Here we traverse the stages from - // the leaf nodes (the result stages of active jobs), and rollback all the stages - // in the stage chains that connect to the `mapStage`. To speed up the stage - // traversing, we collect the stages to rollback first. If a stage needs to - // rollback, all its succeeding stages need to rollback to. - val stagesToRollback = HashSet[Stage](mapStage) - - def collectStagesToRollback(stageChain: List[Stage]): Unit = { - if (stagesToRollback.contains(stageChain.head)) { - stageChain.drop(1).foreach(s => stagesToRollback += s) - } else { - stageChain.head.parents.foreach { s => - collectStagesToRollback(s :: stageChain) - } - } - } - - def generateErrorMessage(stage: Stage): String = { - "A shuffle map stage with indeterminate output was failed and retried. " + - s"However, Spark cannot rollback the $stage to re-process the input data, " + - "and has to fail this job. Please eliminate the indeterminacy by " + - "checkpointing the RDD before repartition and try again." - } - - activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil)) - - // The stages will be rolled back after checking - val rollingBackStages = HashSet[Stage](mapStage) - stagesToRollback.foreach { - case mapStage: ShuffleMapStage => - val numMissingPartitions = mapStage.findMissingPartitions().length - if (numMissingPartitions < mapStage.numTasks) { - if (sc.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { - val reason = "A shuffle map stage with indeterminate output was failed " + - "and retried. However, Spark can only do this while using the new " + - "shuffle block fetching protocol. Please check the config " + - "'spark.shuffle.useOldFetchProtocol', see more detail in " + - "SPARK-27665 and SPARK-25341." - abortStage(mapStage, reason, None) - } else { - rollingBackStages += mapStage - } - } - - case resultStage: ResultStage if resultStage.activeJob.isDefined => - val numMissingPartitions = resultStage.findMissingPartitions().length - if (numMissingPartitions < resultStage.numTasks) { - // TODO: support to rollback result tasks. - abortStage(resultStage, generateErrorMessage(resultStage), None) - } - - case _ => - } + val rollingBackStages = abortIndeterminateStageChildren(mapStage)._2 logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output was failed, " + log"we will roll back and rerun below stages which include itself and all its " + log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}") @@ -2346,6 +2300,76 @@ private[spark] class DAGScheduler( } } + /** + * @param mapStage The indeterminate stage being recomputed + * @return (All jobs with map stage have been aborted, stages which will need to be rolled back) + */ + private def abortIndeterminateStageChildren(mapStage: ShuffleMapStage): + (Boolean, HashSet[Stage]) = { + + assert(mapStage.isIndeterminate) + + // TODO: perhaps materialize this if we are going to compute it often enough ? + // It's a little tricky to find all the succeeding stages of `mapStage`, because + // each stage only know its parents not children. Here we traverse the stages from + // the leaf nodes (the result stages of active jobs), and rollback all the stages + // in the stage chains that connect to the `mapStage`. To speed up the stage + // traversing, we collect the stages to rollback first. If a stage needs to + // rollback, all its succeeding stages need to rollback to. + val stagesToRollback = HashSet[Stage](mapStage) + + def collectStagesToRollback(stageChain: List[Stage]): Unit = { + if (stagesToRollback.contains(stageChain.head)) { + stageChain.drop(1).foreach(s => stagesToRollback += s) + } else { + stageChain.head.parents.foreach { s => + collectStagesToRollback(s :: stageChain) + } + } + } + + def generateErrorMessage(stage: Stage): String = { + "A shuffle map stage with indeterminate output was failed and retried. " + + s"However, Spark cannot rollback the $stage to re-process the input data, " + + "and has to fail this job. Please eliminate the indeterminacy by " + + "checkpointing the RDD before repartition and try again." + } + + activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil)) + val numJobsWithStage = activeJobs.count(job => stagesToRollback.contains(job.finalStage)) + + // The stages will be rolled back after checking + val rollingBackStages = HashSet[Stage](mapStage) + var numAbortedJobs = 0 + stagesToRollback.foreach { + case mapStage: ShuffleMapStage => + val numMissingPartitions = mapStage.findMissingPartitions().length + if (numMissingPartitions < mapStage.numTasks) { + if (sc.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { + val reason = "A shuffle map stage with indeterminate output was failed " + + "and retried. However, Spark can only do this while using the new " + + "shuffle block fetching protocol. Please check the config " + + "'spark.shuffle.useOldFetchProtocol', see more detail in " + + "SPARK-27665 and SPARK-25341." + abortStage(mapStage, reason, None) + } else { + rollingBackStages += mapStage + } + } + + case resultStage: ResultStage if resultStage.activeJob.isDefined => + val numMissingPartitions = resultStage.findMissingPartitions().length + if (numMissingPartitions < resultStage.numTasks) { + // TODO: support to rollback result tasks. + abortStage(resultStage, generateErrorMessage(resultStage), None) + numAbortedJobs += 1 + } + + case _ => + } + (numAbortedJobs >= numJobsWithStage, rollingBackStages) + } + /** * Whether executor is decommissioning or decommissioned. * Return true when: diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3e507df706ba5..d5a2db9ef5329 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -18,11 +18,11 @@ package org.apache.spark.scheduler import java.util.{ArrayList => JArrayList, Collections => JCollections, Properties} -import java.util.concurrent.{CountDownLatch, Delayed, ScheduledFuture, TimeUnit} +import java.util.concurrent.{CountDownLatch, Delayed, LinkedBlockingQueue, ScheduledFuture, TimeUnit} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference} import scala.annotation.meta.param -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import scala.jdk.CollectionConverters._ import scala.language.reflectiveCalls import scala.util.control.NonFatal @@ -56,28 +56,31 @@ class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler) dagScheduler.setEventProcessLoop(this) - private var isProcessing = false - private val eventQueue = new ListBuffer[DAGSchedulerEvent]() - + private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]() override def post(event: DAGSchedulerEvent): Unit = { - if (isProcessing) { - // `DAGSchedulerEventProcessLoop` is guaranteed to process events sequentially. So we should - // buffer events for sequent processing later instead of processing them recursively. - eventQueue += event - } else { - try { - isProcessing = true - // Forward event to `onReceive` directly to avoid processing event asynchronously. - onReceive(event) - } catch { - case NonFatal(e) => onError(e) - } finally { - isProcessing = false - } - if (eventQueue.nonEmpty) { - post(eventQueue.remove(0)) - } + // `DAGSchedulerEventProcessLoop` is guaranteed to process events sequentially in the main test + // thread similarly as it is done in production using the "dag-scheduler-event-loop". + // So we should buffer events for sequent processing later instead of executing them + // on thread calling post() (which might be the "dag-scheduler-message" thread for some + // events posted by the DAGScheduler itself) + eventQueue.put(event) + } + + def runEvents(): Unit = { + var dagEvent = eventQueue.poll() + while (dagEvent != null) { + onReciveWithErrorHandler(dagEvent) + dagEvent = eventQueue.poll() + } + } + + private def onReciveWithErrorHandler(event: DAGSchedulerEvent): Unit = { + try { + // Forward event to `onReceive` directly to avoid processing event asynchronously. + onReceive(event) + } catch { + case NonFatal(e) => onError(e) } } @@ -306,7 +309,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti var broadcastManager: BroadcastManager = null var securityMgr: SecurityManager = null var scheduler: DAGScheduler = null - var dagEventProcessLoopTester: DAGSchedulerEventProcessLoop = null + var dagEventProcessLoopTester: DAGSchedulerEventProcessLoopTester = null /** * Set of cache locations to return from our mock BlockManagerMaster. @@ -479,6 +482,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // Ensure the initialization of various components sc dagEventProcessLoopTester.post(event) + dagEventProcessLoopTester.runEvents() } /** @@ -1190,11 +1194,12 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti private def completeNextStageWithFetchFailure( stageId: Int, attemptIdx: Int, - shuffleDep: ShuffleDependency[_, _, _]): Unit = { + shuffleDep: ShuffleDependency[_, _, _], + srcHost: String = "hostA"): Unit = { val stageAttempt = taskSets.last checkStageId(stageId, attemptIdx, stageAttempt) complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map { case (task, idx) => - (FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0L, 0, idx, "ignored"), null) + (FetchFailed(makeBlockManagerId(srcHost), shuffleDep.shuffleId, 0L, 0, idx, "ignored"), null) }.toSeq) } @@ -2251,6 +2256,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assert(completedStage === List(0, 1)) Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) + dagEventProcessLoopTester.runEvents() // map stage resubmitted assert(scheduler.runningStages.size === 1) val mapStage = scheduler.runningStages.head @@ -2286,6 +2292,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti sc.listenerBus.waitUntilEmpty() Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) + dagEventProcessLoopTester.runEvents() // map stage is running by resubmitted, result stage is waiting // map tasks and the origin result task 1.0 are running assert(scheduler.runningStages.size == 1, "Map stage should be running") @@ -3125,6 +3132,105 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assert(countSubmittedMapStageAttempts() === 2) } + /** + * This function creates the following dependency graph: + * + * (determinate) (indeterminate) + * shuffleMapRdd0 shuffleMapRDD1 + * \ / + * \ / + * finalRdd + * + * Both ShuffleMapRdds will be ShuffleMapStages with 2 partitions executed on + * hostA_exec and hostB_exec. + */ + def constructMixedDeterminateDependencies(): + (ShuffleDependency[_, _, _], ShuffleDependency[_, _, _]) = { + val numPartitions = 2 + val shuffleMapRdd0 = new MyRDD(sc, numPartitions, Nil, indeterminate = false) + + val shuffleDep0 = new ShuffleDependency(shuffleMapRdd0, new HashPartitioner(2)) + val shuffleId0 = shuffleDep0.shuffleId + val shuffleMapRdd1 = + new MyRDD(sc, numPartitions, Nil, tracker = mapOutputTracker, indeterminate = true) + + val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2)) + val shuffleId1 = shuffleDep1.shuffleId + val finalRdd = + new MyRDD(sc, numPartitions, List(shuffleDep0, shuffleDep1), tracker = mapOutputTracker) + + submit(finalRdd, Array(0, 1)) + val stageId0 = this.scheduler.shuffleIdToMapStage(shuffleId0).id + + // Finish the first shuffle map stage. + completeShuffleMapStageSuccessfully(0, 0, numPartitions, Seq("hostA", "hostB")) + completeShuffleMapStageSuccessfully(1, 0, numPartitions, Seq("hostA", "hostB")) + assert(mapOutputTracker.findMissingPartitions(0) === Some(Seq.empty)) + assert(mapOutputTracker.findMissingPartitions(1) === Some(Seq.empty)) + + (shuffleDep0, shuffleDep1) + } + + test("SPARK-51272: re-submit of an indeterminate stage whithout partial result can succeed") { + val shuffleDeps = constructMixedDeterminateDependencies() + val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage] + + // the fetch failure is from the determinate shuffle map stage but this leads to + // executor lost and removing the shuffle files generated by the indeterminate stage too + completeNextStageWithFetchFailure(resultStage.id, 0, shuffleDeps._1, "hostA") + + Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) + dagEventProcessLoopTester.runEvents() + assert(scheduler.runningStages.size === 2) + assert(scheduler.runningStages.forall(_.isInstanceOf[ShuffleMapStage])) + + completeShuffleMapStageSuccessfully(0, 1, 2, Seq("hostA", "hostB")) + completeShuffleMapStageSuccessfully(1, 1, 2, Seq("hostA", "hostB")) + assert(scheduler.runningStages.size === 1) + assert(scheduler.runningStages.head === resultStage) + assert(resultStage.latestInfo.failureReason.isEmpty) + + completeNextResultStageWithSuccess(resultStage.id, 1) + } + + test("SPARK-51272: re-submit of an indeterminate stage whith partial result will fail") { + val shuffleDeps = constructMixedDeterminateDependencies() + val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage] + + runEvent(makeCompletionEvent(taskSets(2).tasks(0), Success, 42)) + // the fetch failure is from the determinate shuffle map stage but this leads to + // executor lost and removing the shuffle files generated by the indeterminate stage too + runEvent(makeCompletionEvent( + taskSets(2).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleDeps._1.shuffleId, 0L, 0, 0, "ignored"), + null)) + + dagEventProcessLoopTester.runEvents() + // resubmission has not yet happened, so job is still running + assert(scheduler.activeJobs.nonEmpty) + Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) + dagEventProcessLoopTester.runEvents() + + // all dependent jobs have been failed + assert(scheduler.runningStages.size === 0) + assert(scheduler.activeJobs.isEmpty) + assert(resultStage.latestInfo.failureReason.isDefined) + assert(resultStage.latestInfo.failureReason.get. + contains("A shuffle map stage with indeterminate output was failed and retried. " + + "However, Spark cannot rollback the ResultStage")) + + + // assert(scheduler.runningStages.size === 2) + // assert(scheduler.runningStages.forall(_.isInstanceOf[ShuffleMapStage])) + + // completeShuffleMapStageSuccessfully(0, 1, 2, Seq("hostA", "hostB")) + // completeShuffleMapStageSuccessfully(1, 1, 2, Seq("hostA", "hostB")) + // assert(scheduler.runningStages.size === 0) + // assert(resultStage.latestInfo.failureReason.isDefined) + // assert(resultStage.latestInfo.failureReason.get === "Job aborted due to stage failure: " + + // "Re-submit of a partially completed indeterminate result stage is not supported") + } + private def constructIndeterminateStageFetchFailed(): (Int, Int) = { val shuffleMapRdd1 = new MyRDD(sc, 2, Nil, indeterminate = true) @@ -4884,6 +4990,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // wait resubmit sc.listenerBus.waitUntilEmpty() Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) + dagEventProcessLoopTester.runEvents() // stage0 retry val stage0Retry = taskSets.filter(_.stageId == 1) @@ -4984,6 +5091,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // the stages will now get resubmitted due to the failure Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) + dagEventProcessLoopTester.runEvents() // parent map stage resubmitted assert(scheduler.runningStages.size === 1) @@ -5003,6 +5111,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti result = MapStatus(BlockManagerId("hostF-exec1", "hostF", 12345), Array.fill[Long](2)(2), mapTaskId = taskIdCount))) Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) + dagEventProcessLoopTester.runEvents() // The retries should succeed sc.listenerBus.waitUntilEmpty() @@ -5012,6 +5121,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // This will add 3 new stages. submit(reduceRdd, Array(0, 1)) Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) + dagEventProcessLoopTester.runEvents() // Only the last stage needs to execute, and those tasks - so completed stages should not // change. From 3028c56467352924e3c44554b28b508355616e8b Mon Sep 17 00:00:00 2001 From: attilapiros Date: Wed, 30 Apr 2025 15:30:24 -0700 Subject: [PATCH 02/11] step1 --- .../org/apache/spark/scheduler/DAGScheduler.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 96b6433931563..5386ff8edbfac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1554,8 +1554,10 @@ private[spark] class DAGScheduler( case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable => // already executed atleast once if (sms.getNextAttemptId > 0) { - val (allJobsAborted, rollingBackStages) = abortIndeterminateStageChildren(sms) - if (allJobsAborted) { + val allStageDependentJobsAborted = abortIndeterminateStageChildren(sms)._1 + if (allStageDependentJobsAborted) { + logInfo("All jobs depending on this indeterminate stage (" + stage + ") were " + + "aborted so this stage is not needed anymore.") return } } @@ -2340,7 +2342,6 @@ private[spark] class DAGScheduler( // The stages will be rolled back after checking val rollingBackStages = HashSet[Stage](mapStage) - var numAbortedJobs = 0 stagesToRollback.foreach { case mapStage: ShuffleMapStage => val numMissingPartitions = mapStage.findMissingPartitions().length @@ -2362,12 +2363,15 @@ private[spark] class DAGScheduler( if (numMissingPartitions < resultStage.numTasks) { // TODO: support to rollback result tasks. abortStage(resultStage, generateErrorMessage(resultStage), None) - numAbortedJobs += 1 } case _ => } - (numAbortedJobs >= numJobsWithStage, rollingBackStages) + + val numActiveJobsWithStageAfterRollback = + activeJobs.count(job => stagesToRollback.contains(job.finalStage)) + + (numActiveJobsWithStageAfterRollback == 0, rollingBackStages) } /** From a40cd3e53c16a573a4da41035f239eea45df8b33 Mon Sep 17 00:00:00 2001 From: attilapiros Date: Wed, 30 Apr 2025 17:11:08 -0700 Subject: [PATCH 03/11] remove 1 line --- .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5386ff8edbfac..d426e57dd19d6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -2338,7 +2338,6 @@ private[spark] class DAGScheduler( } activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil)) - val numJobsWithStage = activeJobs.count(job => stagesToRollback.contains(job.finalStage)) // The stages will be rolled back after checking val rollingBackStages = HashSet[Stage](mapStage) From 10bf4c9bd6cb1a1186f380895461afdb5beb4db3 Mon Sep 17 00:00:00 2001 From: attilapiros Date: Wed, 30 Apr 2025 17:43:45 -0700 Subject: [PATCH 04/11] Step2 --- .../apache/spark/scheduler/DAGScheduler.scala | 50 ++++++++++--------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d426e57dd19d6..1a2acb32a66e5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1554,8 +1554,13 @@ private[spark] class DAGScheduler( case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable => // already executed atleast once if (sms.getNextAttemptId > 0) { - val allStageDependentJobsAborted = abortIndeterminateStageChildren(sms)._1 - if (allStageDependentJobsAborted) { + val stagesToRollback = collectSucceedingStages(sms) + rollBackStages(stagesToRollback) + // stages which cannot be rolled back were aborted which leads to removing the + // the dependant job(s) from the active jobs set + val numActiveJobsWithStageAfterRollback = + activeJobs.count(job => stagesToRollback.contains(job.finalStage)) + if (numActiveJobsWithStageAfterRollback == 0) { logInfo("All jobs depending on this indeterminate stage (" + stage + ") were " + "aborted so this stage is not needed anymore.") return @@ -2138,7 +2143,8 @@ private[spark] class DAGScheduler( // guaranteed to be determinate, so the input data of the reducers will not change // even if the map tasks are re-tried. if (mapStage.isIndeterminate) { - val rollingBackStages = abortIndeterminateStageChildren(mapStage)._2 + val stagesToRollback = collectSucceedingStages(mapStage) + val rollingBackStages = rollBackStages(stagesToRollback) logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output was failed, " + log"we will roll back and rerun below stages which include itself and all its " + log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}") @@ -2302,15 +2308,7 @@ private[spark] class DAGScheduler( } } - /** - * @param mapStage The indeterminate stage being recomputed - * @return (All jobs with map stage have been aborted, stages which will need to be rolled back) - */ - private def abortIndeterminateStageChildren(mapStage: ShuffleMapStage): - (Boolean, HashSet[Stage]) = { - - assert(mapStage.isIndeterminate) - + private def collectSucceedingStages(mapStage: ShuffleMapStage): HashSet[Stage] = { // TODO: perhaps materialize this if we are going to compute it often enough ? // It's a little tricky to find all the succeeding stages of `mapStage`, because // each stage only know its parents not children. Here we traverse the stages from @@ -2318,17 +2316,26 @@ private[spark] class DAGScheduler( // in the stage chains that connect to the `mapStage`. To speed up the stage // traversing, we collect the stages to rollback first. If a stage needs to // rollback, all its succeeding stages need to rollback to. - val stagesToRollback = HashSet[Stage](mapStage) + val succeedingStages = HashSet[Stage](mapStage) - def collectStagesToRollback(stageChain: List[Stage]): Unit = { - if (stagesToRollback.contains(stageChain.head)) { - stageChain.drop(1).foreach(s => stagesToRollback += s) + def collectSucceedingStagesInternal(stageChain: List[Stage]): Unit = { + if (succeedingStages.contains(stageChain.head)) { + stageChain.drop(1).foreach(s => succeedingStages += s) } else { stageChain.head.parents.foreach { s => - collectStagesToRollback(s :: stageChain) + collectSucceedingStagesInternal(s :: stageChain) } } } + activeJobs.foreach(job => collectSucceedingStagesInternal(job.finalStage :: Nil)) + succeedingStages + } + + /** + * @param stagesToRollback stages to roll back + * @return Shuffle map stages which need and can be rolled back + */ + private def rollBackStages(stagesToRollback: HashSet[Stage]): HashSet[Stage] = { def generateErrorMessage(stage: Stage): String = { "A shuffle map stage with indeterminate output was failed and retried. " + @@ -2337,10 +2344,8 @@ private[spark] class DAGScheduler( "checkpointing the RDD before repartition and try again." } - activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil)) - // The stages will be rolled back after checking - val rollingBackStages = HashSet[Stage](mapStage) + val rollingBackStages = HashSet[Stage]() stagesToRollback.foreach { case mapStage: ShuffleMapStage => val numMissingPartitions = mapStage.findMissingPartitions().length @@ -2367,10 +2372,7 @@ private[spark] class DAGScheduler( case _ => } - val numActiveJobsWithStageAfterRollback = - activeJobs.count(job => stagesToRollback.contains(job.finalStage)) - - (numActiveJobsWithStageAfterRollback == 0, rollingBackStages) + rollingBackStages } /** From 60e575c1d4642ca9fb3c9e858ca21f6c2aefdd8e Mon Sep 17 00:00:00 2001 From: attilapiros Date: Wed, 30 Apr 2025 20:43:29 -0700 Subject: [PATCH 05/11] fix logging by using Structured Logging Framework --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 1a2acb32a66e5..7290d9eb3533d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1561,8 +1561,8 @@ private[spark] class DAGScheduler( val numActiveJobsWithStageAfterRollback = activeJobs.count(job => stagesToRollback.contains(job.finalStage)) if (numActiveJobsWithStageAfterRollback == 0) { - logInfo("All jobs depending on this indeterminate stage (" + stage + ") were " + - "aborted so this stage is not needed anymore.") + logInfo(log"All jobs depending on the indeterminate stage " + + log"(${MDC(STAGE_ID, stage.id)}) were aborted so this stage is not needed anymore.") return } } From dd141c585595b4e24d8b9ad43ef32df0c67d9110 Mon Sep 17 00:00:00 2001 From: attilapiros Date: Sat, 3 May 2025 01:26:09 -0700 Subject: [PATCH 06/11] use 'mapStage.numAvailableOutputs > 0' instead of 'numMissingPartitions < mapStage.numTasks' --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7290d9eb3533d..888e7417bfecd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -2348,8 +2348,7 @@ private[spark] class DAGScheduler( val rollingBackStages = HashSet[Stage]() stagesToRollback.foreach { case mapStage: ShuffleMapStage => - val numMissingPartitions = mapStage.findMissingPartitions().length - if (numMissingPartitions < mapStage.numTasks) { + if (mapStage.numAvailableOutputs > 0) { if (sc.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { val reason = "A shuffle map stage with indeterminate output was failed " + "and retried. However, Spark can only do this while using the new " + From 66ca27463b9f7d4b064fec04a743050f6f2c6f9e Mon Sep 17 00:00:00 2001 From: attilapiros Date: Sun, 4 May 2025 11:25:57 -0700 Subject: [PATCH 07/11] add an extra assert --- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d5a2db9ef5329..049bba9089550 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -3218,17 +3218,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assert(resultStage.latestInfo.failureReason.get. contains("A shuffle map stage with indeterminate output was failed and retried. " + "However, Spark cannot rollback the ResultStage")) - - - // assert(scheduler.runningStages.size === 2) - // assert(scheduler.runningStages.forall(_.isInstanceOf[ShuffleMapStage])) - - // completeShuffleMapStageSuccessfully(0, 1, 2, Seq("hostA", "hostB")) - // completeShuffleMapStageSuccessfully(1, 1, 2, Seq("hostA", "hostB")) - // assert(scheduler.runningStages.size === 0) - // assert(resultStage.latestInfo.failureReason.isDefined) - // assert(resultStage.latestInfo.failureReason.get === "Job aborted due to stage failure: " + - // "Re-submit of a partially completed indeterminate result stage is not supported") + assert(scheduler.activeJobs.isEmpty, "Aborting the stage aborts the job as well.") } private def constructIndeterminateStageFetchFailed(): (Int, Int) = { From b09a77dae392dc1bff7764111980ab598bb34d52 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 7 May 2025 19:36:42 +0200 Subject: [PATCH 08/11] minor fix --- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 049bba9089550..d4e90be7c66dd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -3148,19 +3148,16 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti (ShuffleDependency[_, _, _], ShuffleDependency[_, _, _]) = { val numPartitions = 2 val shuffleMapRdd0 = new MyRDD(sc, numPartitions, Nil, indeterminate = false) - val shuffleDep0 = new ShuffleDependency(shuffleMapRdd0, new HashPartitioner(2)) - val shuffleId0 = shuffleDep0.shuffleId + val shuffleMapRdd1 = new MyRDD(sc, numPartitions, Nil, tracker = mapOutputTracker, indeterminate = true) - val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2)) - val shuffleId1 = shuffleDep1.shuffleId + val finalRdd = new MyRDD(sc, numPartitions, List(shuffleDep0, shuffleDep1), tracker = mapOutputTracker) submit(finalRdd, Array(0, 1)) - val stageId0 = this.scheduler.shuffleIdToMapStage(shuffleId0).id // Finish the first shuffle map stage. completeShuffleMapStageSuccessfully(0, 0, numPartitions, Seq("hostA", "hostB")) @@ -3171,7 +3168,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti (shuffleDep0, shuffleDep1) } - test("SPARK-51272: re-submit of an indeterminate stage whithout partial result can succeed") { + test("SPARK-51272: re-submit of an indeterminate stage without partial result can succeed") { val shuffleDeps = constructMixedDeterminateDependencies() val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage] @@ -3193,7 +3190,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti completeNextResultStageWithSuccess(resultStage.id, 1) } - test("SPARK-51272: re-submit of an indeterminate stage whith partial result will fail") { + test("SPARK-51272: re-submit of an indeterminate stage with partial result will fail") { val shuffleDeps = constructMixedDeterminateDependencies() val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage] From ec7f3738efe17d87850f150cbeb0e69418873995 Mon Sep 17 00:00:00 2001 From: attilapiros Date: Fri, 16 May 2025 14:38:18 -0700 Subject: [PATCH 09/11] addressing review comments --- .../org/apache/spark/scheduler/DAGScheduler.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 888e7417bfecd..648b83e85ab7f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1552,10 +1552,16 @@ private[spark] class DAGScheduler( // `findMissingPartitions()` returns all partitions every time. stage match { case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable => - // already executed atleast once + // already executed at least once if (sms.getNextAttemptId > 0) { + // While we previously validated possible rollbacks during the handling of a FetchFailure, + // where we were fetching from an indeterminate source map stages, this later check + // covers additional cases like recalculating an indeterminate stage after an executor + // loss. Moreover, because this check occurs later in the process, if a result stage task + // has successfully completed, we can detect this and abort the job, as rolling back a + // result stage is not possible. val stagesToRollback = collectSucceedingStages(sms) - rollBackStages(stagesToRollback) + validateStageRollBacks(stagesToRollback) // stages which cannot be rolled back were aborted which leads to removing the // the dependant job(s) from the active jobs set val numActiveJobsWithStageAfterRollback = @@ -2144,7 +2150,7 @@ private[spark] class DAGScheduler( // even if the map tasks are re-tried. if (mapStage.isIndeterminate) { val stagesToRollback = collectSucceedingStages(mapStage) - val rollingBackStages = rollBackStages(stagesToRollback) + val rollingBackStages = validateStageRollBacks(stagesToRollback) logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output was failed, " + log"we will roll back and rerun below stages which include itself and all its " + log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}") @@ -2335,7 +2341,7 @@ private[spark] class DAGScheduler( * @param stagesToRollback stages to roll back * @return Shuffle map stages which need and can be rolled back */ - private def rollBackStages(stagesToRollback: HashSet[Stage]): HashSet[Stage] = { + private def validateStageRollBacks(stagesToRollback: HashSet[Stage]): HashSet[Stage] = { def generateErrorMessage(stage: Stage): String = { "A shuffle map stage with indeterminate output was failed and retried. " + From 63ca5d7f74174feb800cf502298ea5829e12d27c Mon Sep 17 00:00:00 2001 From: attilapiros Date: Sat, 17 May 2025 05:29:33 -0700 Subject: [PATCH 10/11] validateStageRollBacks -> abortInvalidStageRollBacks --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 648b83e85ab7f..e80b9104f25ef 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1561,7 +1561,7 @@ private[spark] class DAGScheduler( // has successfully completed, we can detect this and abort the job, as rolling back a // result stage is not possible. val stagesToRollback = collectSucceedingStages(sms) - validateStageRollBacks(stagesToRollback) + abortInvalidStageRollBacks(stagesToRollback) // stages which cannot be rolled back were aborted which leads to removing the // the dependant job(s) from the active jobs set val numActiveJobsWithStageAfterRollback = @@ -2150,7 +2150,7 @@ private[spark] class DAGScheduler( // even if the map tasks are re-tried. if (mapStage.isIndeterminate) { val stagesToRollback = collectSucceedingStages(mapStage) - val rollingBackStages = validateStageRollBacks(stagesToRollback) + val rollingBackStages = abortInvalidStageRollBacks(stagesToRollback) logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output was failed, " + log"we will roll back and rerun below stages which include itself and all its " + log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}") @@ -2338,10 +2338,12 @@ private[spark] class DAGScheduler( } /** + * Abort stages where roll back is requested but cannot be completed. + * * @param stagesToRollback stages to roll back * @return Shuffle map stages which need and can be rolled back */ - private def validateStageRollBacks(stagesToRollback: HashSet[Stage]): HashSet[Stage] = { + private def abortInvalidStageRollBacks(stagesToRollback: HashSet[Stage]): HashSet[Stage] = { def generateErrorMessage(stage: Stage): String = { "A shuffle map stage with indeterminate output was failed and retried. " + From 6c91f8be6031f0db1f8a3233bf5c0a91f2a13df2 Mon Sep 17 00:00:00 2001 From: attilapiros Date: Sat, 17 May 2025 08:49:30 -0700 Subject: [PATCH 11/11] abortInvalidStageRollBacks -> abortStageWithInvalidRollBack --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e80b9104f25ef..baf0ed4df5309 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1561,7 +1561,7 @@ private[spark] class DAGScheduler( // has successfully completed, we can detect this and abort the job, as rolling back a // result stage is not possible. val stagesToRollback = collectSucceedingStages(sms) - abortInvalidStageRollBacks(stagesToRollback) + abortStageWithInvalidRollBack(stagesToRollback) // stages which cannot be rolled back were aborted which leads to removing the // the dependant job(s) from the active jobs set val numActiveJobsWithStageAfterRollback = @@ -2150,7 +2150,7 @@ private[spark] class DAGScheduler( // even if the map tasks are re-tried. if (mapStage.isIndeterminate) { val stagesToRollback = collectSucceedingStages(mapStage) - val rollingBackStages = abortInvalidStageRollBacks(stagesToRollback) + val rollingBackStages = abortStageWithInvalidRollBack(stagesToRollback) logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output was failed, " + log"we will roll back and rerun below stages which include itself and all its " + log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}") @@ -2343,7 +2343,7 @@ private[spark] class DAGScheduler( * @param stagesToRollback stages to roll back * @return Shuffle map stages which need and can be rolled back */ - private def abortInvalidStageRollBacks(stagesToRollback: HashSet[Stage]): HashSet[Stage] = { + private def abortStageWithInvalidRollBack(stagesToRollback: HashSet[Stage]): HashSet[Stage] = { def generateErrorMessage(stage: Stage): String = { "A shuffle map stage with indeterminate output was failed and retried. " +