From fd46d7a11eb912e7e88a61f1031ab133cdd01d67 Mon Sep 17 00:00:00 2001 From: wuyi Date: Fri, 26 Apr 2019 00:05:38 +0800 Subject: [PATCH 1/2] fix leaked readlock --- .../scala/org/apache/spark/scheduler/ResultTask.scala | 11 ++++++++++- .../scala/org/apache/spark/SparkContextSuite.scala | 9 +++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 857c89d7a98f5..b74d21ff451a8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -25,6 +25,7 @@ import java.util.Properties import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD +import org.apache.spark.util.CompletionIterator /** * A task that sends back the output to the driver application. @@ -87,7 +88,15 @@ private[spark] class ResultTask[T, U]( threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L - func(context, rdd.iterator(partition, context)) + val iter = rdd.iterator(partition, context).asInstanceOf[InterruptibleIterator[T]] + val res = func(context, iter) + // SPARK-27568: operations like take() could not consume all elements when func() finished, + // which would lead to readLock on blocks leaked. So, we manually call completion() for + // those operations here. + if (iter.hasNext && iter.delegate.isInstanceOf[CompletionIterator[T, Iterator[T]]]) { + iter.delegate.asInstanceOf[CompletionIterator[T, Iterator[T]]].completion() + } + res } // This is only callable on the driver side. diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 3490eaf550ce6..05cf62aa25d5e 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -731,6 +731,15 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu resetSparkContext() } } + + test("call take() on a cached rdd should not leak readLock on the block") { + val conf = new SparkConf() + .setAppName("test") + .setMaster("local") + .set("spark.storage.exceptionOnPinLeak", "true") + sc = new SparkContext(conf) + assert(sc.parallelize(Range(0, 10), 1).cache().take(1).head === 0) + } } object SparkContextSuite { From 21ba1dc2b9af2855e4b258761c5f720a29dcd4c7 Mon Sep 17 00:00:00 2001 From: Ngone51 Date: Fri, 26 Apr 2019 14:42:19 +0800 Subject: [PATCH 2/2] add first() --- .../main/scala/org/apache/spark/scheduler/ResultTask.scala | 2 +- core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index b74d21ff451a8..e717f4041df10 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -91,7 +91,7 @@ private[spark] class ResultTask[T, U]( val iter = rdd.iterator(partition, context).asInstanceOf[InterruptibleIterator[T]] val res = func(context, iter) // SPARK-27568: operations like take() could not consume all elements when func() finished, - // which would lead to readLock on blocks leaked. So, we manually call completion() for + // which would lead to readLock on block leaked. So, we manually call completion() for // those operations here. if (iter.hasNext && iter.delegate.isInstanceOf[CompletionIterator[T, Iterator[T]]]) { iter.delegate.asInstanceOf[CompletionIterator[T, Iterator[T]]].completion() diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 05cf62aa25d5e..4573f82808ac8 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -732,13 +732,15 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } - test("call take() on a cached rdd should not leak readLock on the block") { + test("SPARK-27568: call take()/first() on a cached rdd should not leak readLock on the block") { val conf = new SparkConf() .setAppName("test") .setMaster("local") .set("spark.storage.exceptionOnPinLeak", "true") sc = new SparkContext(conf) + // No exception, no pin leak assert(sc.parallelize(Range(0, 10), 1).cache().take(1).head === 0) + assert(sc.parallelize(Range(0, 10), 1).cache().first() === 0) } }