From 3584a09410d72207d8a00dcbf385b2e276925fc8 Mon Sep 17 00:00:00 2001 From: wuyi Date: Fri, 6 Apr 2018 23:34:18 +0800 Subject: [PATCH 1/6] add task attempt running check in hasAttemptOnHost --- .../spark/scheduler/TaskSetManager.scala | 2 +- .../spark/scheduler/TaskSetManagerSuite.scala | 53 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index d958658527f6d..97df3cdd16e95 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -289,7 +289,7 @@ private[spark] class TaskSetManager( /** Check whether a task is currently running an attempt on a given host */ private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = { - taskAttempts(taskIndex).exists(_.host == host) + taskAttempts(taskIndex).exists { info => info.running && info.host == host } } private def isTaskBlacklistedOnExecOrNode(index: Int, execId: String, host: String): Boolean = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ca6a7e5db3b17..127c6bf825acc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -880,6 +880,59 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3) } + test("speculative task should not run on a given host where another attempt " + + "is already running on") { + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler( + sc, ("execA", "host1"), ("execB", "host2")) + val taskSet = FakeTask.createTaskSet(1, + Seq(TaskLocation("host1", "execA"), TaskLocation("host2", "execB"))) + val clock = new ManualClock + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + + // let task0.0 run on host1 + assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index == 0) + val info1 = manager.taskAttempts(0)(0) + assert(info1.running === true) + assert(info1.host === "host1") + + // long time elapse, and task0.0 is still running, + // so we launch a speculative task0.1 on host2 + clock.advance(1000) + manager.speculatableTasks += 0 + assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL).get.index === 0) + val info2 = manager.taskAttempts(0)(0) + assert(info2.running === true) + assert(info2.host === "host2") + assert(manager.speculatableTasks.size === 0) + + // now, task0 has two copies running on host1, host2 separately, + // so we can not launch a speculative task on any hosts. + manager.speculatableTasks += 0 + assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL) === None) + assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL) === None) + assert(manager.speculatableTasks.size === 1) + + // after a long long time, task0.0 failed, and task0.0 can not re-run since + // there's already a running copy. + clock.advance(1000) + info1.finishTime = clock.getTimeMillis() + assert(info1.running === false) + + // time goes on, and task0.1 is still running + clock.advance(1000) + // so we try to launch a new speculative task + // we can not run it on host2, because task0.1 is already running on + assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL) === None) + // we successfully launch a speculative task0.2 on host1, since there's + // no more running copy of task0 + assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0) + val info3 = manager.taskAttempts(0)(0) + assert(info3.running === true) + assert(info3.host === "host1") + assert(manager.speculatableTasks.size === 0) + } + test("node-local tasks should be scheduled right away " + "when there are only node-local and no-preference tasks") { sc = new SparkContext("local", "test") From 2ed958418ff182bca0a3af1bf35999130312e78f Mon Sep 17 00:00:00 2001 From: wuyi Date: Wed, 11 Apr 2018 09:48:30 +0800 Subject: [PATCH 2/6] address review comments --- .../spark/scheduler/TaskSetManagerSuite.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 127c6bf825acc..94eed00090a75 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -880,8 +880,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3) } - test("speculative task should not run on a given host where another attempt " + - "is already running on") { + test("SPARK-23888: speculative task should not run on a given host " + + "where another attempt is already running on") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler( sc, ("execA", "host1"), ("execB", "host2")) @@ -893,7 +893,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // let task0.0 run on host1 assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index == 0) val info1 = manager.taskAttempts(0)(0) - assert(info1.running === true) + assert(info1.running) assert(info1.host === "host1") // long time elapse, and task0.0 is still running, @@ -902,7 +902,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg manager.speculatableTasks += 0 assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL).get.index === 0) val info2 = manager.taskAttempts(0)(0) - assert(info2.running === true) + assert(info2.running) assert(info2.host === "host2") assert(manager.speculatableTasks.size === 0) @@ -917,7 +917,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // there's already a running copy. clock.advance(1000) info1.finishTime = clock.getTimeMillis() - assert(info1.running === false) + manager.handleFailedTask(info1.taskId, TaskState.FAILED, UnknownReason) + assert(!info1.running) // time goes on, and task0.1 is still running clock.advance(1000) @@ -928,7 +929,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // no more running copy of task0 assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0) val info3 = manager.taskAttempts(0)(0) - assert(info3.running === true) + assert(info3.running) assert(info3.host === "host1") assert(manager.speculatableTasks.size === 0) } From 5901728d6be8ad33e39d56006a2bc8cc02cfff38 Mon Sep 17 00:00:00 2001 From: wuyi Date: Thu, 12 Apr 2018 08:51:01 +0800 Subject: [PATCH 3/6] address comments --- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 94eed00090a75..5180545c2f990 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -880,8 +880,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3) } - test("SPARK-23888: speculative task should not run on a given host " + - "where another attempt is already running on") { + test("SPARK-23888: speculative task cannot run on a host with another " + + "running attempt, but can run on a host with a failed attempt.") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler( sc, ("execA", "host1"), ("execB", "host2")) @@ -916,7 +916,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // after a long long time, task0.0 failed, and task0.0 can not re-run since // there's already a running copy. clock.advance(1000) - info1.finishTime = clock.getTimeMillis() manager.handleFailedTask(info1.taskId, TaskState.FAILED, UnknownReason) assert(!info1.running) From 500cc77a3cf942730a625c81c7536455510d42f0 Mon Sep 17 00:00:00 2001 From: wuyi Date: Tue, 17 Apr 2018 22:47:53 +0800 Subject: [PATCH 4/6] correct the comment --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 97df3cdd16e95..6e9a2c76b73a7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -287,9 +287,9 @@ private[spark] class TaskSetManager( None } - /** Check whether a task is currently running an attempt on a given host */ + /** Check whether a task once run an attempt on a given host */ private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = { - taskAttempts(taskIndex).exists { info => info.running && info.host == host } + taskAttempts(taskIndex).exists(_.host == host) } private def isTaskBlacklistedOnExecOrNode(index: Int, execId: String, host: String): Boolean = { From 0c6f3058a5c0af4a6e9cd1a90d43230805305df5 Mon Sep 17 00:00:00 2001 From: wuyi Date: Tue, 17 Apr 2018 22:49:21 +0800 Subject: [PATCH 5/6] remove UT --- .../spark/scheduler/TaskSetManagerSuite.scala | 53 ------------------- 1 file changed, 53 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 5180545c2f990..ca6a7e5db3b17 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -880,59 +880,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3) } - test("SPARK-23888: speculative task cannot run on a host with another " + - "running attempt, but can run on a host with a failed attempt.") { - sc = new SparkContext("local", "test") - sched = new FakeTaskScheduler( - sc, ("execA", "host1"), ("execB", "host2")) - val taskSet = FakeTask.createTaskSet(1, - Seq(TaskLocation("host1", "execA"), TaskLocation("host2", "execB"))) - val clock = new ManualClock - val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) - - // let task0.0 run on host1 - assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index == 0) - val info1 = manager.taskAttempts(0)(0) - assert(info1.running) - assert(info1.host === "host1") - - // long time elapse, and task0.0 is still running, - // so we launch a speculative task0.1 on host2 - clock.advance(1000) - manager.speculatableTasks += 0 - assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL).get.index === 0) - val info2 = manager.taskAttempts(0)(0) - assert(info2.running) - assert(info2.host === "host2") - assert(manager.speculatableTasks.size === 0) - - // now, task0 has two copies running on host1, host2 separately, - // so we can not launch a speculative task on any hosts. - manager.speculatableTasks += 0 - assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL) === None) - assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL) === None) - assert(manager.speculatableTasks.size === 1) - - // after a long long time, task0.0 failed, and task0.0 can not re-run since - // there's already a running copy. - clock.advance(1000) - manager.handleFailedTask(info1.taskId, TaskState.FAILED, UnknownReason) - assert(!info1.running) - - // time goes on, and task0.1 is still running - clock.advance(1000) - // so we try to launch a new speculative task - // we can not run it on host2, because task0.1 is already running on - assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL) === None) - // we successfully launch a speculative task0.2 on host1, since there's - // no more running copy of task0 - assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0) - val info3 = manager.taskAttempts(0)(0) - assert(info3.running) - assert(info3.host === "host1") - assert(manager.speculatableTasks.size === 0) - } - test("node-local tasks should be scheduled right away " + "when there are only node-local and no-preference tasks") { sc = new SparkContext("local", "test") From e44d80b70985f9be23ccafd0868e29c946ab4a5a Mon Sep 17 00:00:00 2001 From: wuyi Date: Mon, 23 Apr 2018 22:16:23 +0800 Subject: [PATCH 6/6] address comment --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 6e9a2c76b73a7..8a96a7692f614 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -287,7 +287,7 @@ private[spark] class TaskSetManager( None } - /** Check whether a task once run an attempt on a given host */ + /** Check whether a task once ran an attempt on a given host */ private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = { taskAttempts(taskIndex).exists(_.host == host) }