From ac67195761fb87fe030e1b485c770037ff983258 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Wed, 30 Jul 2014 15:59:27 -0700 Subject: [PATCH 1/7] [SPARK-1021] Defer the data-driven computation of partition bounds in sortByKey() until evaluation. --- .../main/scala/org/apache/spark/Partitioner.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 37053bb6f37ad..55aa34c086302 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -113,8 +113,13 @@ class RangePartitioner[K : Ordering : ClassTag, V]( private var ordering = implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions - private var rangeBounds: Array[K] = { - if (partitions <= 1) { + private var valRB: Array[K] = Array() + private var haveRB: Boolean = false + + private def rangeBounds: Array[K] = { + if (haveRB) return valRB + + valRB = if (partitions <= 1) { Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. @@ -152,6 +157,9 @@ class RangePartitioner[K : Ordering : ClassTag, V]( RangePartitioner.determineBounds(candidates, partitions) } } + + haveRB = true + valRB } def numPartitions = rangeBounds.length + 1 @@ -234,7 +242,8 @@ class RangePartitioner[K : Ordering : ClassTag, V]( val ser = sfactory.newInstance() Utils.deserializeViaNestedStream(in, ser) { ds => implicit val classTag = ds.readObject[ClassTag[Array[K]]]() - rangeBounds = ds.readObject[Array[K]]() + valRB = ds.readObject[Array[K]]() + haveRB = true } } } From 7143f97f753843f537812dc7bffbe169bc483430 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Wed, 6 Aug 2014 18:42:31 -0700 Subject: [PATCH 2/7] [SPARK-1021] modify range bounds variable to be thread safe --- .../main/scala/org/apache/spark/Partitioner.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 55aa34c086302..6a4bca1f26582 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -113,11 +113,10 @@ class RangePartitioner[K : Ordering : ClassTag, V]( private var ordering = implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions - private var valRB: Array[K] = Array() - private var haveRB: Boolean = false + @volatile private var valRB: Array[K] = null - private def rangeBounds: Array[K] = { - if (haveRB) return valRB + private def rangeBounds: Array[K] = this.synchronized { + if (valRB != null) return valRB valRB = if (partitions <= 1) { Array.empty @@ -158,7 +157,6 @@ class RangePartitioner[K : Ordering : ClassTag, V]( } } - haveRB = true valRB } @@ -230,7 +228,8 @@ class RangePartitioner[K : Ordering : ClassTag, V]( } @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream) { + private def readObject(in: ObjectInputStream): Unit = this.synchronized { + if (valRB != null) return val sfactory = SparkEnv.get.serializer sfactory match { case js: JavaSerializer => in.defaultReadObject() @@ -243,7 +242,6 @@ class RangePartitioner[K : Ordering : ClassTag, V]( Utils.deserializeViaNestedStream(in, ser) { ds => implicit val classTag = ds.readObject[ClassTag[Array[K]]]() valRB = ds.readObject[Array[K]]() - haveRB = true } } } From ca8913ea9ad42859fe876d11c1d5afd14790e62f Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Thu, 14 Aug 2014 16:39:48 -0700 Subject: [PATCH 3/7] RangePartition sampling job -> FutureAction --- .../main/scala/org/apache/spark/Partitioner.scala | 15 +++++++++++++-- .../org/apache/spark/rdd/AsyncRDDActions.scala | 11 ++++++++--- .../scala/org/apache/spark/ShuffleSuite.scala | 6 +++++- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 6a4bca1f26582..f2008b2ab215d 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -29,6 +29,10 @@ import org.apache.spark.serializer.JavaSerializer import org.apache.spark.util.{CollectionsUtils, Utils} import org.apache.spark.util.random.{XORShiftRandom, SamplingUtils} +import org.apache.spark.SparkContext.rddToAsyncRDDActions +import scala.concurrent.Await +import scala.concurrent.duration.Duration + /** * An object that defines how the elements in a key-value pair RDD are partitioned by key. * Maps each key to a partition ID, from 0 to `numPartitions - 1`. @@ -261,12 +265,19 @@ private[spark] object RangePartitioner { sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = { val shift = rdd.id // val classTagK = classTag[K] // to avoid serializing the entire partitioner object - val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => + // use collectAsync here to run this job as a future, which is cancellable + val sketchFuture = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift << 16)) val (sample, n) = SamplingUtils.reservoirSampleAndCount( iter, sampleSizePerPartition, seed) Iterator((idx, n, sample)) - }.collect() + }.collectAsync() + // We do need the future's value to continue any further + val sketched = Await.ready(sketchFuture, Duration.Inf).value.get match { + case scala.util.Success(v) => v.toArray + case scala.util.Failure(e) => + throw new SparkException("Range Partitioner sampling job failed: " + e) + } val numItems = sketched.map(_._2.toLong).sum (numItems, sketched) } diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index b62f3fbdc4a15..4580e115cfc08 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -58,9 +58,14 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi * Returns a future for retrieving all elements of this RDD. */ def collectAsync(): FutureAction[Seq[T]] = { - val results = new Array[Array[T]](self.partitions.size) - self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.size), - (index, data) => results(index) = data, results.flatten.toSeq) + val f = new ComplexFutureAction[Seq[T]] + f.run { + val results = new Array[Array[T]](self.partitions.size) + f.runJob[T, Array[T], Unit](self, _.toArray, Range(0, self.partitions.size), + (index, data) => results(index) = data, Unit) + results.flatten.toSeq + } + f } /** diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index b13ddf96bc77c..3d6f41316f380 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -240,7 +240,11 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext { } assert(thrown.getClass === classOf[SparkException]) - assert(thrown.getMessage.toLowerCase.contains("serializable")) + // SPARK-1021 candidate fix, which wraps data driven RangePartitioner + // sampling job in a FutureAction, is masking the original exception with + // NullPointerException. Haven't been able to figure out why yet, so + // I'm tentatively disabling this check as a stopgap: + //assert(thrown.getMessage.toLowerCase.contains("serializable")) } } From b2b20e83a84e89f280e3fb749beb9a97a38a9d7c Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Fri, 15 Aug 2014 10:03:40 -0700 Subject: [PATCH 4/7] Fix bug in exception passing with ComplexFutureAction[T] --- core/src/main/scala/org/apache/spark/FutureAction.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 75ea535f2f57b..c277c3a47d421 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -208,7 +208,7 @@ class ComplexFutureAction[T] extends FutureAction[T] { processPartition: Iterator[T] => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit, - resultFunc: => R) { + resultFunc: => R): R = { // If the action hasn't been cancelled yet, submit the job. The check and the submitJob // command need to be in an atomic block. val job = this.synchronized { @@ -223,7 +223,10 @@ class ComplexFutureAction[T] extends FutureAction[T] { // cancel the job and stop the execution. This is not in a synchronized block because // Await.ready eventually waits on the monitor in FutureJob.jobWaiter. try { - Await.ready(job, Duration.Inf) + Await.ready(job, Duration.Inf).value.get match { + case scala.util.Failure(e) => throw e + case scala.util.Success(v) => v + } } catch { case e: InterruptedException => job.cancel() From b88b5d4dcb7665dd836d28f66c89eeeeeb9db63b Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Fri, 15 Aug 2014 10:05:16 -0700 Subject: [PATCH 5/7] tweak async actions to use ComplexFutureAction[T] so they handle RangePartitioner sampling job properly --- .../apache/spark/rdd/AsyncRDDActions.scala | 60 ++++++++++--------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 4580e115cfc08..938ef31a1914b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -38,20 +38,15 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi * Returns a future for counting the number of elements in the RDD. */ def countAsync(): FutureAction[Long] = { - val totalCount = new AtomicLong - self.context.submitJob( - self, - (iter: Iterator[T]) => { - var result = 0L - while (iter.hasNext) { - result += 1L - iter.next() - } - result - }, - Range(0, self.partitions.size), - (index: Int, data: Long) => totalCount.addAndGet(data), - totalCount.get()) + val f = new ComplexFutureAction[Long] + f.run { + val totalCount = new AtomicLong + f.runJob(self, + (iter: Iterator[T]) => iter.length, + Range(0, self.partitions.size), + (index: Int, data: Int) => totalCount.addAndGet(data.toLong), + totalCount.get()) + } } /** @@ -61,11 +56,12 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi val f = new ComplexFutureAction[Seq[T]] f.run { val results = new Array[Array[T]](self.partitions.size) - f.runJob[T, Array[T], Unit](self, _.toArray, Range(0, self.partitions.size), - (index, data) => results(index) = data, Unit) - results.flatten.toSeq + f.runJob(self, + (iter: Iterator[T]) => iter.toArray, + Range(0, self.partitions.size), + (index: Int, data: Array[T]) => results(index) = data, + results.flatten.toSeq) } - f } /** @@ -109,24 +105,34 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi } results.toSeq } - - f } /** * Applies a function f to all elements of this RDD. */ - def foreachAsync(f: T => Unit): FutureAction[Unit] = { - val cleanF = self.context.clean(f) - self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size), - (index, data) => Unit, Unit) + def foreachAsync(expr: T => Unit): FutureAction[Unit] = { + val f = new ComplexFutureAction[Unit] + val exprClean = self.context.clean(expr) + f.run { + f.runJob(self, + (iter: Iterator[T]) => iter.foreach(exprClean), + Range(0, self.partitions.size), + (index: Int, data: Unit) => Unit, + Unit) + } } /** * Applies a function f to each partition of this RDD. */ - def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = { - self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.size), - (index, data) => Unit, Unit) + def foreachPartitionAsync(expr: Iterator[T] => Unit): FutureAction[Unit] = { + val f = new ComplexFutureAction[Unit] + f.run { + f.runJob(self, + expr, + Range(0, self.partitions.size), + (index: Int, data: Unit) => Unit, + Unit) + } } } From 4e334a93ee68fbcecd0b90d3d09391feefa8a15d Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Fri, 15 Aug 2014 10:23:07 -0700 Subject: [PATCH 6/7] exception mystery fixed by fixing bug in ComplexFutureAction --- core/src/main/scala/org/apache/spark/Partitioner.scala | 3 +-- core/src/test/scala/org/apache/spark/ShuffleSuite.scala | 6 +----- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index f2008b2ab215d..d40b152d221c5 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -275,8 +275,7 @@ private[spark] object RangePartitioner { // We do need the future's value to continue any further val sketched = Await.ready(sketchFuture, Duration.Inf).value.get match { case scala.util.Success(v) => v.toArray - case scala.util.Failure(e) => - throw new SparkException("Range Partitioner sampling job failed: " + e) + case scala.util.Failure(e) => throw e } val numItems = sketched.map(_._2.toLong).sum (numItems, sketched) diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 3d6f41316f380..b13ddf96bc77c 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -240,11 +240,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext { } assert(thrown.getClass === classOf[SparkException]) - // SPARK-1021 candidate fix, which wraps data driven RangePartitioner - // sampling job in a FutureAction, is masking the original exception with - // NullPointerException. Haven't been able to figure out why yet, so - // I'm tentatively disabling this check as a stopgap: - //assert(thrown.getMessage.toLowerCase.contains("serializable")) + assert(thrown.getMessage.toLowerCase.contains("serializable")) } } From 50b6da6234188a147508654b08e6b67cbf01fbec Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Sat, 16 Aug 2014 09:24:16 -0700 Subject: [PATCH 7/7] use standard getIteratorSize in countAsync --- .../main/scala/org/apache/spark/rdd/AsyncRDDActions.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 938ef31a1914b..7a68b3afa8158 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.ExecutionContext.Implicits.global import scala.reflect.ClassTag +import org.apache.spark.util.Utils import org.apache.spark.{ComplexFutureAction, FutureAction, Logging} import org.apache.spark.annotation.Experimental @@ -42,9 +43,9 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi f.run { val totalCount = new AtomicLong f.runJob(self, - (iter: Iterator[T]) => iter.length, + (iter: Iterator[T]) => Utils.getIteratorSize(iter), Range(0, self.partitions.size), - (index: Int, data: Int) => totalCount.addAndGet(data.toLong), + (index: Int, data: Long) => totalCount.addAndGet(data), totalCount.get()) } }