From acca0e3da2da21e4e184f01b4e8f7b6b8c05ee1d Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Wed, 31 Jan 2018 14:27:21 +0800 Subject: [PATCH 01/10] [SPARK-23040][CORE]: Returns interruptible iterator for shuffle reader Before this commit, a non-interruptible iterator is returned if aggregator or ordering is specified. --- .../shuffle/BlockStoreShuffleReader.scala | 11 +++++- .../apache/spark/JobCancellationSuite.scala | 37 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 0562d45ff57c5..163c4da95f1ef 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -95,7 +95,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( } // Sort the output if there is a sort ordering defined. - dep.keyOrdering match { + val resultIter = dep.keyOrdering match { case Some(keyOrd: Ordering[K]) => // Create an ExternalSorter to sort the data. val sorter = @@ -104,9 +104,18 @@ private[spark] class BlockStoreShuffleReader[K, C]( context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) + // Use completion callback to stop sorter if task was cancelled. + context.addTaskCompletionListener(tc => { + // Note: we only stop sorter if cancelled as sorter.stop wouldn't be called in + // CompletionIterator. Another way would be making sorter.stop idempotent. + if (tc.isInterrupted()) { sorter.stop() } + }) CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) case None => aggregatedIter } + // Use another interruptible iterator here to support task cancellation as aggregator or(and) + // sorter may have consumed previous interruptible iterator. + new InterruptibleIterator[Product2[K, C]](context, resultIter) } } diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index 8a77aea75a992..3d2b1f3cd9cba 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark import java.util.concurrent.Semaphore +import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future @@ -320,6 +321,41 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft f2.get() } + test("Interruptible iterator of shuffle reader") { + import JobCancellationSuite._ + sc = new SparkContext("local[2]", "test") + + val f = sc.parallelize(1 to 1000, 2).map { i => (i, i) } + .repartitionAndSortWithinPartitions(new HashPartitioner(2)) + .mapPartitions { iter => + taskStartedSemaphore.release() + // Small delay to ensure that foreach is cancelled if task is killed + Thread.sleep(1000) + iter + }.foreachAsync { _ => + executionOfInterruptibleCounter.getAndIncrement() + } + + val sem = new Semaphore(0) + Future { + taskStartedSemaphore.acquire() + f.cancel() + sem.release() + } + + sem.acquire() + + val e = intercept[SparkException] { f.get() }.getCause + + assert(executionOfInterruptibleCounter.get() === 0) + assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) + + // Small delay to ensure tasks are actually finished or killed + Thread.sleep(2000) + assert(executionOfInterruptibleCounter.get() === 0) + + } + def testCount() { // Cancel before launching any tasks { @@ -384,4 +420,5 @@ object JobCancellationSuite { val taskStartedSemaphore = new Semaphore(0) val taskCancelledSemaphore = new Semaphore(0) val twoJobsSharingStageSemaphore = new Semaphore(0) + val executionOfInterruptibleCounter = new AtomicInteger(0) } From ddeffd825770314bf08cff9c02d8dad57b387172 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Fri, 9 Feb 2018 14:35:38 +0800 Subject: [PATCH 02/10] Update comments and tests. --- .../shuffle/BlockStoreShuffleReader.scala | 8 ++-- .../apache/spark/JobCancellationSuite.scala | 44 ++++++++++++------- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 163c4da95f1ef..5ef4237ae5ea5 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -104,11 +104,9 @@ private[spark] class BlockStoreShuffleReader[K, C]( context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) - // Use completion callback to stop sorter if task was cancelled. - context.addTaskCompletionListener(tc => { - // Note: we only stop sorter if cancelled as sorter.stop wouldn't be called in - // CompletionIterator. Another way would be making sorter.stop idempotent. - if (tc.isInterrupted()) { sorter.stop() } + // Use completion callback to stop sorter if task was completed(either finished/cancelled). + context.addTaskCompletionListener(_ => { + sorter.stop() }) CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) case None => diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index 3d2b1f3cd9cba..6b6c9ee83a4b1 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -23,11 +23,9 @@ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.concurrent.duration._ - import org.scalatest.BeforeAndAfter import org.scalatest.Matchers - -import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart} +import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted, SparkListenerTaskEnd, SparkListenerTaskStart} import org.apache.spark.util.ThreadUtils /** @@ -323,38 +321,52 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft test("Interruptible iterator of shuffle reader") { import JobCancellationSuite._ - sc = new SparkContext("local[2]", "test") + val numSlice = 2 + sc = new SparkContext(s"local[$numSlice]", "test") - val f = sc.parallelize(1 to 1000, 2).map { i => (i, i) } + val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) } .repartitionAndSortWithinPartitions(new HashPartitioner(2)) .mapPartitions { iter => taskStartedSemaphore.release() - // Small delay to ensure that foreach is cancelled if task is killed - Thread.sleep(1000) iter - }.foreachAsync { _ => + }.foreachAsync { x => + if ( x._1 >= 10) { // this block of code is partially executed. + taskCancelledSemaphore.acquire() + } executionOfInterruptibleCounter.getAndIncrement() } val sem = new Semaphore(0) + val taskCompletedSem = new Semaphore(0) Future { taskStartedSemaphore.acquire() f.cancel() sem.release() } - sem.acquire() + sc.addSparkListener(new SparkListener { + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + // release taskCancelledSemaphore when cancelTasks event has been posted + if (stageCompleted.stageInfo.stageId == 1) { + taskCancelledSemaphore.release(1000) + } + } - val e = intercept[SparkException] { f.get() }.getCause + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + if (taskEnd.stageId == 1) { // make sure task ends + taskCompletedSem.release() + } + } + }) - assert(executionOfInterruptibleCounter.get() === 0) + sem.acquire() + val e = intercept[SparkException] { f.get() }.getCause assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) - // Small delay to ensure tasks are actually finished or killed - Thread.sleep(2000) - assert(executionOfInterruptibleCounter.get() === 0) - - } + taskCompletedSem.acquire(numSlice) + // 11 as |1..10| + |501|(another partition) + assert(executionOfInterruptibleCounter.get() <= 11) + } def testCount() { // Cancel before launching any tasks From 88e86e0ef2fc069cb0c6531979b9ae713bc88c90 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Mon, 26 Feb 2018 20:23:18 +0800 Subject: [PATCH 03/10] Fix style issues and update comments --- .../spark/shuffle/BlockStoreShuffleReader.scala | 2 +- .../org/apache/spark/JobCancellationSuite.scala | 16 +++++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 5ef4237ae5ea5..85fd1bfefe48a 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -104,7 +104,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) - // Use completion callback to stop sorter if task was completed(either finished/cancelled). + // Use completion callback to stop sorter if task was finished/cancelled. context.addTaskCompletionListener(_ => { sorter.stop() }) diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index 6b6c9ee83a4b1..04ec3a406dd49 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -23,8 +23,10 @@ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.concurrent.duration._ + import org.scalatest.BeforeAndAfter import org.scalatest.Matchers + import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted, SparkListenerTaskEnd, SparkListenerTaskStart} import org.apache.spark.util.ThreadUtils @@ -320,6 +322,9 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } test("Interruptible iterator of shuffle reader") { + // In this test case, we create a Spark job of two stages. The second stage is cancelled during + // execution and a counter is used to make sure that the corresponding tasks are indeed + // cancelled. import JobCancellationSuite._ val numSlice = 2 sc = new SparkContext(s"local[$numSlice]", "test") @@ -330,7 +335,11 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft taskStartedSemaphore.release() iter }.foreachAsync { x => - if ( x._1 >= 10) { // this block of code is partially executed. + if (x._1 >= 10) { + // This block of code is partially executed. It will be blocked when x._1 >= 10 and the + // next iteration will be cancelled if the source iterator is interruptible. Then in this + // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first + // element in another partition(assuming no ordering guarantee). taskCancelledSemaphore.acquire() } executionOfInterruptibleCounter.getAndIncrement() @@ -353,18 +362,19 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - if (taskEnd.stageId == 1) { // make sure task ends + if (taskEnd.stageId == 1) { // make sure tasks are completed taskCompletedSem.release() } } }) + // Make sure code in the Future block is finished, a.k.a tasks are being cancelled. sem.acquire() val e = intercept[SparkException] { f.get() }.getCause assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) + // Make sure tasks are indeed completed. taskCompletedSem.acquire(numSlice) - // 11 as |1..10| + |501|(another partition) assert(executionOfInterruptibleCounter.get() <= 11) } From ba2f355dca21f1baa7cad82199402dcec1798584 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Tue, 27 Feb 2018 14:09:38 +0800 Subject: [PATCH 04/10] Remove unnecessary Semaphore and Future block --- .../scala/org/apache/spark/JobCancellationSuite.scala | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index 04ec3a406dd49..b4a5a5d9b9433 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -345,13 +345,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft executionOfInterruptibleCounter.getAndIncrement() } - val sem = new Semaphore(0) val taskCompletedSem = new Semaphore(0) - Future { - taskStartedSemaphore.acquire() - f.cancel() - sem.release() - } sc.addSparkListener(new SparkListener { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { @@ -368,8 +362,9 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } }) - // Make sure code in the Future block is finished, a.k.a tasks are being cancelled. - sem.acquire() + taskStartedSemaphore.acquire() + f.cancel() + val e = intercept[SparkException] { f.get() }.getCause assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) From d6ed9a15e24c414251ffaf09839eb5b3c0567d75 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Tue, 27 Feb 2018 22:18:12 +0800 Subject: [PATCH 05/10] Use single partition in unit test --- .../scala/org/apache/spark/JobCancellationSuite.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index b4a5a5d9b9433..ddf76dee26d21 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -326,11 +326,11 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft // execution and a counter is used to make sure that the corresponding tasks are indeed // cancelled. import JobCancellationSuite._ - val numSlice = 2 + val numSlice = 1 sc = new SparkContext(s"local[$numSlice]", "test") val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) } - .repartitionAndSortWithinPartitions(new HashPartitioner(2)) + .repartitionAndSortWithinPartitions(new HashPartitioner(numSlice)) .mapPartitions { iter => taskStartedSemaphore.release() iter @@ -338,8 +338,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft if (x._1 >= 10) { // This block of code is partially executed. It will be blocked when x._1 >= 10 and the // next iteration will be cancelled if the source iterator is interruptible. Then in this - // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first - // element in another partition(assuming no ordering guarantee). + // case, the maximum num of increment would be 10(|1...10|) taskCancelledSemaphore.acquire() } executionOfInterruptibleCounter.getAndIncrement() @@ -365,12 +364,12 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft taskStartedSemaphore.acquire() f.cancel() - val e = intercept[SparkException] { f.get() }.getCause + val e = intercept[SparkException](f.get()).getCause assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) // Make sure tasks are indeed completed. taskCompletedSem.acquire(numSlice) - assert(executionOfInterruptibleCounter.get() <= 11) + assert(executionOfInterruptibleCounter.get() <= 10) } def testCount() { From 8c15c564c7d2d0adc0cfd725e34dbd359c6a0ab6 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Tue, 27 Feb 2018 22:34:31 +0800 Subject: [PATCH 06/10] Add more comments for the invocation of `f.cancel()` --- .../test/scala/org/apache/spark/JobCancellationSuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index ddf76dee26d21..97923872af847 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -362,6 +362,12 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft }) taskStartedSemaphore.acquire() + // Job is cancelled when: + // 1. task in reduce stage has been started, guaranteed by previous line. + // 2. task in reduce stage is blocked after processing at most 10 records as + // taskCancelledSemaphore is not released until cancelTasks event is posted + // After job being cancelled, task in reduce stage will be cancelled and no more iteration are + // executed. f.cancel() val e = intercept[SparkException](f.get()).getCause From 756e0b7336fff3c72eca70c2ab489600211b9253 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Thu, 1 Mar 2018 11:04:29 +0800 Subject: [PATCH 07/10] Hardcode numSlice(num of partition) to 1 --- .../scala/org/apache/spark/JobCancellationSuite.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index 97923872af847..e3b2679a918b4 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -326,11 +326,10 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft // execution and a counter is used to make sure that the corresponding tasks are indeed // cancelled. import JobCancellationSuite._ - val numSlice = 1 - sc = new SparkContext(s"local[$numSlice]", "test") + sc = new SparkContext(s"local", "test") - val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) } - .repartitionAndSortWithinPartitions(new HashPartitioner(numSlice)) + val f = sc.parallelize(1 to 1000).map { i => (i, i) } + .repartitionAndSortWithinPartitions(new HashPartitioner(1)) .mapPartitions { iter => taskStartedSemaphore.release() iter @@ -374,7 +373,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) // Make sure tasks are indeed completed. - taskCompletedSem.acquire(numSlice) + taskCompletedSem.acquire() assert(executionOfInterruptibleCounter.get() <= 10) } From 2061d0a37e5a885b8d0961df841f34fa9da79709 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Thu, 1 Mar 2018 15:47:41 +0800 Subject: [PATCH 08/10] Reset global semaphores as they may interfere each other in multiple tests Move addSparkListener to beginning of test --- .../apache/spark/JobCancellationSuite.scala | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index e3b2679a918b4..95b1f49fb4343 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -41,6 +41,10 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft override def afterEach() { try { resetSparkContext() + // Reset semaphores if used by multiple tests. + // Note: if other semaphores are shared by multiple tests, please reset them in this block + JobCancellationSuite.taskStartedSemaphore.drainPermits() + JobCancellationSuite.taskCancelledSemaphore.drainPermits() } finally { super.afterEach() } @@ -326,22 +330,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft // execution and a counter is used to make sure that the corresponding tasks are indeed // cancelled. import JobCancellationSuite._ - sc = new SparkContext(s"local", "test") - - val f = sc.parallelize(1 to 1000).map { i => (i, i) } - .repartitionAndSortWithinPartitions(new HashPartitioner(1)) - .mapPartitions { iter => - taskStartedSemaphore.release() - iter - }.foreachAsync { x => - if (x._1 >= 10) { - // This block of code is partially executed. It will be blocked when x._1 >= 10 and the - // next iteration will be cancelled if the source iterator is interruptible. Then in this - // case, the maximum num of increment would be 10(|1...10|) - taskCancelledSemaphore.acquire() - } - executionOfInterruptibleCounter.getAndIncrement() - } + sc = new SparkContext("local[2]", "test interruptible iterator") val taskCompletedSem = new Semaphore(0) @@ -360,6 +349,21 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } }) + val f = sc.parallelize(1 to 1000).map { i => (i, i) } + .repartitionAndSortWithinPartitions(new HashPartitioner(1)) + .mapPartitions { iter => + taskStartedSemaphore.release() + iter + }.foreachAsync { x => + if (x._1 >= 10) { + // This block of code is partially executed. It will be blocked when x._1 >= 10 and the + // next iteration will be cancelled if the source iterator is interruptible. Then in this + // case, the maximum num of increment would be 10(|1...10|) + taskCancelledSemaphore.acquire() + } + executionOfInterruptibleCounter.getAndIncrement() + } + taskStartedSemaphore.acquire() // Job is cancelled when: // 1. task in reduce stage has been started, guaranteed by previous line. From a3d8ad56f0709c343e508c8b636083243f9ffdd2 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Thu, 1 Mar 2018 15:57:00 +0800 Subject: [PATCH 09/10] Use lowercase for test name. --- core/src/test/scala/org/apache/spark/JobCancellationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index 95b1f49fb4343..21c4c9b897325 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -325,7 +325,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft f2.get() } - test("Interruptible iterator of shuffle reader") { + test("interruptible iterator of shuffle reader") { // In this test case, we create a Spark job of two stages. The second stage is cancelled during // execution and a counter is used to make sure that the corresponding tasks are indeed // cancelled. From 28119e9e191e0c2ec2acd2a12643e6bc00f5cca4 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Thu, 1 Mar 2018 23:53:15 +0800 Subject: [PATCH 10/10] Add comment for global variables --- .../test/scala/org/apache/spark/JobCancellationSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index 21c4c9b897325..3b793bb231cf3 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -41,10 +41,10 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft override def afterEach() { try { resetSparkContext() - // Reset semaphores if used by multiple tests. - // Note: if other semaphores are shared by multiple tests, please reset them in this block JobCancellationSuite.taskStartedSemaphore.drainPermits() JobCancellationSuite.taskCancelledSemaphore.drainPermits() + JobCancellationSuite.twoJobsSharingStageSemaphore.drainPermits() + JobCancellationSuite.executionOfInterruptibleCounter.set(0) } finally { super.afterEach() } @@ -442,6 +442,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft object JobCancellationSuite { + // To avoid any headaches, reset these global variables in the companion class's afterEach block val taskStartedSemaphore = new Semaphore(0) val taskCancelledSemaphore = new Semaphore(0) val twoJobsSharingStageSemaphore = new Semaphore(0)