From 6c40b9f3d8b5935d5930f17b61ee11384cf2b011 Mon Sep 17 00:00:00 2001 From: liujianhui Date: Wed, 8 Mar 2017 19:55:48 +0800 Subject: [PATCH 1/3] [SPARK-19868] conflict TasksetManager lead to spark stopped --- .../apache/spark/scheduler/TaskSetManager.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 19ebaf817e24e..c46b07e5293c7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -697,13 +697,7 @@ private[spark] class TaskSetManager( val index = info.index info.markFinished(TaskState.FINISHED) removeRunningTask(tid) - // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the - // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not - // "deserialize" the value when holding a lock to avoid blocking other threads. So we call - // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here. - // Note: "result.value()" only deserializes the value when it's called at the first time, so - // here "result.value()" just returns the value and won't block other threads. - sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) + // Kill any other attempts for the same task (since those are unnecessary now that one // attempt completed successfully). for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { @@ -726,6 +720,13 @@ private[spark] class TaskSetManager( logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } + // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the + // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not + // "deserialize" the value when holding a lock to avoid blocking other threads. So we call + // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here. + // Note: "result.value()" only deserializes the value when it's called at the first time, so + // here "result.value()" just returns the value and won't block other threads. + sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) maybeFinishTaskSet() } From 17acd55f57498195360d4b5876047c92005c3a68 Mon Sep 17 00:00:00 2001 From: liujianhui Date: Tue, 28 Mar 2017 09:09:48 +0800 Subject: [PATCH 2/3] [SPARK-19868] conflict TasksetManager lead to spark stopped --- .../spark/scheduler/TaskSetManagerSuite.scala | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 2c2cda9f318eb..69c8ea3940945 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -22,8 +22,10 @@ import java.util.{Properties, Random} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.mockito.Matchers.{anyInt, anyString} +import org.mockito.Matchers.{any, anyInt, anyString} import org.mockito.Mockito.{mock, never, spy, verify, when} +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer import org.apache.spark._ import org.apache.spark.internal.config @@ -1042,6 +1044,29 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.isZombie) } + + test("SPARK-19868: DagScheduler only notified of taskEnd when state is ready") { + // dagScheduler.taskEnded() is async, so it may *seem* ok to call it before we've set all + // appropriate state, eg. isZombie. However, this sets up a race that could go the wrong way. + // This is a super-focused regression test which checks the zombie state as soon as + // dagScheduler.taskEnded() is called, to ensure we haven't introduced a race. + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + val mockDAGScheduler = mock(classOf[DAGScheduler]) + sched.dagScheduler = mockDAGScheduler + val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock) + when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any())).then(new Answer[Unit] { + override def answer(invocationOnMock: InvocationOnMock): Unit = { + assert(manager.isZombie === true) + } + }) + val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) + assert(taskOption.isDefined) + // this would fail, inside our mock dag scheduler, if it calls dagScheduler.taskEnded() too soon + manager.handleSuccessfulTask(0, createTaskResult(0)) + } + test("SPARK-17894: Verify TaskSetManagers for different stage attempts have unique names") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc, ("exec1", "host1")) From fd6739255df23c54e7c19080d73eeeca389754b4 Mon Sep 17 00:00:00 2001 From: liujianhui Date: Tue, 28 Mar 2017 14:55:43 +0800 Subject: [PATCH 3/3] [SPARK-19868] conflict TasksetManager lead to spark stopped --- .../scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index a0093c1d1dcab..9ca6b8b0fe635 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1069,7 +1069,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val mockDAGScheduler = mock(classOf[DAGScheduler]) sched.dagScheduler = mockDAGScheduler val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0) - val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock(1)) when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any())).then(new Answer[Unit] { override def answer(invocationOnMock: InvocationOnMock): Unit = { assert(manager.isZombie === true)