From d2a600d5c0ab8a068cb23bdd422645d8b1a39f0b Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 13 Mar 2014 01:47:45 -0700 Subject: [PATCH 1/6] add sliding to rdd --- .../main/scala/org/apache/spark/rdd/RDD.scala | 16 +++ .../org/apache/spark/rdd/SlidedRDD.scala | 100 ++++++++++++++++++ .../scala/org/apache/spark/rdd/RDDSuite.scala | 14 +++ 3 files changed, 130 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 4afa7523dd802..9c69843008754 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -950,6 +950,22 @@ abstract class RDD[T: ClassTag]( */ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse) + /** + * Returns a RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding + * window over them. The ordering is first based on the partition index and then the ordering of + * items within each partition. This is similar to sliding in Scala collections, except that it + * becomes an empty RDD if the window size is greater than the total number of items. It needs to + * trigger a Spark job if the parent RDD has more than one partitions and the window size is + * greater than 1. + */ + def sliding(windowSize: Int): RDD[Array[T]] = { + if (windowSize == 1) { + this.map(Array(_)) + } else { + new SlidedRDD[T](this, windowSize) + } + } + /** * Save this RDD as a text file, using string representations of elements. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala new file mode 100644 index 0000000000000..e89f4cc0936de --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.spark.{TaskContext, Partition} + +private[spark] +class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T]) + extends Partition with Serializable { + override val index: Int = idx +} + +/** + * Represents a RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding + * window over them. The ordering is first based on the partition index and then the ordering of + * items within each partition. This is similar to sliding in Scala collections, except that it + * becomes an empty RDD if the window size is greater than the total number of items. It needs to + * trigger a Spark job if the parent RDD has more than one partitions. + * + * @param parent the parent RDD + * @param windowSize the window size, must be greater than 1 + * + * @see [[org.apache.spark.rdd.RDD#sliding]] + */ +private[spark] +class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) + extends RDD[Array[T]](parent) { + + require(windowSize > 1, "Window size must be greater than 1.") + + override def compute(split: Partition, context: TaskContext): Iterator[Array[T]] = { + val part = split.asInstanceOf[SlidedRDDPartition[T]] + (firstParent[T].iterator(part.prev, context) ++ part.tail) + .sliding(windowSize) + .map(_.toArray) + .filter(_.size == windowSize) + } + + override def getPreferredLocations(split: Partition): Seq[String] = + firstParent[T].preferredLocations(split.asInstanceOf[SlidedRDDPartition[T]].prev) + + override def getPartitions: Array[Partition] = { + val parentPartitions = parent.partitions + val n = parentPartitions.size + if (n == 0) { + Array.empty + } else if (n == 1) { + Array(new SlidedRDDPartition[T](0, parentPartitions(0), Array.empty)) + } else { + val n1 = n - 1 + val w1 = windowSize - 1 + // Get the first w1 items of each partition, starting from the second partition. + val nextHeads = + parent.context.runJob(parent, (iter: Iterator[T]) => iter.take(w1).toArray, 1 until n, true) + val partitions = mutable.ArrayBuffer[SlidedRDDPartition[T]]() + var i = 0 + var partitionIndex = 0 + while (i < n1) { + var j = i + val tail = mutable.ArrayBuffer[T]() + // Keep appending to the current tail until appended a head of size w1. + while (j < n1 && nextHeads(j).size < w1) { + tail ++= nextHeads(j) + j += 1 + } + if (j < n1) { + tail ++= nextHeads(j) + j += 1 + } + partitions += new SlidedRDDPartition[T](partitionIndex, parentPartitions(i), tail.toArray) + partitionIndex += 1 + // Skip appended heads. + i = j + } + // If the head of last partition has size w1, we also need to add this partition. + if (nextHeads(n1 - 1).size == w1) { + partitions += new SlidedRDDPartition[T](partitionIndex, parentPartitions(n1), Array.empty) + } + partitions.toArray + } + } +} diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 60bcada55245b..a5962406b2e1a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -553,4 +553,18 @@ class RDDSuite extends FunSuite with SharedSparkContext { val ids = ranked.map(_._1).distinct().collect() assert(ids.length === n) } + + test("sliding") { + val data = 0 until 6 + for (numPartitions <- 1 to 8) { + val rdd = sc.parallelize(data, numPartitions) + for (windowSize <- 1 to 6) { + val slided = rdd.sliding(windowSize).collect().map(_.toList).toList + val expected = data.sliding(windowSize).map(_.toList).toList + assert(slided === expected) + } + assert(rdd.sliding(7).collect().isEmpty, + "Should return an empty RDD if the window size is greater than the number of items.") + } + } } From 5ee6001471b1897400fef1e35b5e10fbfb47395f Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 13 Mar 2014 11:49:04 -0700 Subject: [PATCH 2/6] add TODO --- core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala index e89f4cc0936de..5d14bcdfdcb66 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala @@ -97,4 +97,6 @@ class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) partitions.toArray } } + + // TODO: Override methods such as aggregate, which only requires one Spark job. } From 284d991cf8c79a1ef7db79a9caa35a238e02338a Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sat, 15 Mar 2014 10:12:41 -0700 Subject: [PATCH 3/6] change SlidedRDD to SlidingRDD --- .../main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../rdd/{SlidedRDD.scala => SlidingRDD.scala} | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) rename core/src/main/scala/org/apache/spark/rdd/{SlidedRDD.scala => SlidingRDD.scala} (81%) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 9c69843008754..dfffa50cb2aed 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -962,7 +962,7 @@ abstract class RDD[T: ClassTag]( if (windowSize == 1) { this.map(Array(_)) } else { - new SlidedRDD[T](this, windowSize) + new SlidingRDD[T](this, windowSize) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala similarity index 81% rename from core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala rename to core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala index 5d14bcdfdcb66..ac0016b18298d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala @@ -23,7 +23,7 @@ import scala.reflect.ClassTag import org.apache.spark.{TaskContext, Partition} private[spark] -class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T]) +class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T]) extends Partition with Serializable { override val index: Int = idx } @@ -41,13 +41,13 @@ class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T * @see [[org.apache.spark.rdd.RDD#sliding]] */ private[spark] -class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) +class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) extends RDD[Array[T]](parent) { - require(windowSize > 1, "Window size must be greater than 1.") + require(windowSize > 1, s"Window size must be greater than 1, but got $windowSize.") override def compute(split: Partition, context: TaskContext): Iterator[Array[T]] = { - val part = split.asInstanceOf[SlidedRDDPartition[T]] + val part = split.asInstanceOf[SlidingRDDPartition[T]] (firstParent[T].iterator(part.prev, context) ++ part.tail) .sliding(windowSize) .map(_.toArray) @@ -55,7 +55,7 @@ class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) } override def getPreferredLocations(split: Partition): Seq[String] = - firstParent[T].preferredLocations(split.asInstanceOf[SlidedRDDPartition[T]].prev) + firstParent[T].preferredLocations(split.asInstanceOf[SlidingRDDPartition[T]].prev) override def getPartitions: Array[Partition] = { val parentPartitions = parent.partitions @@ -63,14 +63,14 @@ class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) if (n == 0) { Array.empty } else if (n == 1) { - Array(new SlidedRDDPartition[T](0, parentPartitions(0), Array.empty)) + Array(new SlidingRDDPartition[T](0, parentPartitions(0), Array.empty)) } else { val n1 = n - 1 val w1 = windowSize - 1 // Get the first w1 items of each partition, starting from the second partition. val nextHeads = parent.context.runJob(parent, (iter: Iterator[T]) => iter.take(w1).toArray, 1 until n, true) - val partitions = mutable.ArrayBuffer[SlidedRDDPartition[T]]() + val partitions = mutable.ArrayBuffer[SlidingRDDPartition[T]]() var i = 0 var partitionIndex = 0 while (i < n1) { @@ -85,14 +85,14 @@ class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) tail ++= nextHeads(j) j += 1 } - partitions += new SlidedRDDPartition[T](partitionIndex, parentPartitions(i), tail.toArray) + partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toArray) partitionIndex += 1 // Skip appended heads. i = j } // If the head of last partition has size w1, we also need to add this partition. if (nextHeads(n1 - 1).size == w1) { - partitions += new SlidedRDDPartition[T](partitionIndex, parentPartitions(n1), Array.empty) + partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(n1), Array.empty) } partitions.toArray } From 9916202e0c6bc9d183bc35f3f16302bb7fbbb644 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sat, 15 Mar 2014 10:46:35 -0700 Subject: [PATCH 4/6] change RDD.sliding return type to RDD[Seq[T]] --- .../main/scala/org/apache/spark/rdd/RDD.scala | 4 ++-- .../scala/org/apache/spark/rdd/SlidingRDD.scala | 17 ++++++++--------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index dfffa50cb2aed..debe57883ed33 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -958,9 +958,9 @@ abstract class RDD[T: ClassTag]( * trigger a Spark job if the parent RDD has more than one partitions and the window size is * greater than 1. */ - def sliding(windowSize: Int): RDD[Array[T]] = { + def sliding(windowSize: Int): RDD[Seq[T]] = { if (windowSize == 1) { - this.map(Array(_)) + this.map(Seq(_)) } else { new SlidingRDD[T](this, windowSize) } diff --git a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala index ac0016b18298d..96e3442b878c1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala @@ -23,7 +23,7 @@ import scala.reflect.ClassTag import org.apache.spark.{TaskContext, Partition} private[spark] -class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T]) +class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T]) extends Partition with Serializable { override val index: Int = idx } @@ -42,16 +42,15 @@ class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[ */ private[spark] class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) - extends RDD[Array[T]](parent) { + extends RDD[Seq[T]](parent) { require(windowSize > 1, s"Window size must be greater than 1, but got $windowSize.") - override def compute(split: Partition, context: TaskContext): Iterator[Array[T]] = { + override def compute(split: Partition, context: TaskContext): Iterator[Seq[T]] = { val part = split.asInstanceOf[SlidingRDDPartition[T]] (firstParent[T].iterator(part.prev, context) ++ part.tail) .sliding(windowSize) - .map(_.toArray) - .filter(_.size == windowSize) + .withPartial(false) } override def getPreferredLocations(split: Partition): Seq[String] = @@ -63,7 +62,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int if (n == 0) { Array.empty } else if (n == 1) { - Array(new SlidingRDDPartition[T](0, parentPartitions(0), Array.empty)) + Array(new SlidingRDDPartition[T](0, parentPartitions(0), Seq.empty)) } else { val n1 = n - 1 val w1 = windowSize - 1 @@ -75,7 +74,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int var partitionIndex = 0 while (i < n1) { var j = i - val tail = mutable.ArrayBuffer[T]() + val tail = mutable.ListBuffer[T]() // Keep appending to the current tail until appended a head of size w1. while (j < n1 && nextHeads(j).size < w1) { tail ++= nextHeads(j) @@ -85,14 +84,14 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int tail ++= nextHeads(j) j += 1 } - partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toArray) + partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toSeq) partitionIndex += 1 // Skip appended heads. i = j } // If the head of last partition has size w1, we also need to add this partition. if (nextHeads(n1 - 1).size == w1) { - partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(n1), Array.empty) + partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(n1), Seq.empty) } partitions.toArray } From db6cb30da9ef7ce5ca473f32e709aedb2eeabc34 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sat, 15 Mar 2014 10:59:13 -0700 Subject: [PATCH 5/6] remove unnecessary toSeq --- core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala index 96e3442b878c1..9d1f56732ea41 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala @@ -84,7 +84,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int tail ++= nextHeads(j) j += 1 } - partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toSeq) + partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail) partitionIndex += 1 // Skip appended heads. i = j From cab9a52349a7ffcefeae7660836a6ea1b77d910f Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sat, 15 Mar 2014 11:06:32 -0700 Subject: [PATCH 6/6] use last for the last element --- core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala index 9d1f56732ea41..df87bc5459699 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala @@ -90,7 +90,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int i = j } // If the head of last partition has size w1, we also need to add this partition. - if (nextHeads(n1 - 1).size == w1) { + if (nextHeads.last.size == w1) { partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(n1), Seq.empty) } partitions.toArray