From 215a9bf5bc36bb53c68112aa5cd8a52152d8cd69 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 16 Feb 2014 23:39:39 -0800 Subject: [PATCH 01/23] Fix Java API for mapPartitionsWithIndex --- .../apache/spark/api/java/JavaRDDLike.scala | 12 ++++++----- .../java/org/apache/spark/JavaAPISuite.java | 20 +++++++++++++++++++ 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 24a9925dbd22c..1cf0234aa0241 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -73,11 +73,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ - def mapPartitionsWithIndex[R: ClassTag]( - f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]], - preservesPartitioning: Boolean = false): JavaRDD[R] = - new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))), - preservesPartitioning)) + def mapPartitionsWithIndex[R](f: MapPartitionsWithIndexFunction[T, R], + preservesPartitioning: Boolean = false): JavaRDD[R] = { + import scala.collection.JavaConverters._ + def fn = (a: Int, b: Iterator[T]) => f.apply(a, asJavaIterator(b)).asScala + val newRdd = rdd.mapPartitionsWithIndex(fn, preservesPartitioning)(f.elementType()) + new JavaRDD(newRdd)(f.elementType()) + } /** * Return a new RDD by applying a function to all elements of this RDD. diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 20232e9fbb8d0..401d5901280cb 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -377,6 +377,26 @@ public void javaDoubleRDDHistoGram() { Assert.assertArrayEquals(expected_counts, histogram); } + @Test + public void mapPartitionsWithIndex() { + JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); + JavaRDD rddByIndex = + rdd.mapPartitionsWithIndex(new MapPartitionsWithIndexFunction() { + @Override + public Iterator call(Integer start, java.util.Iterator iter) { + List list = new ArrayList(); + int pos = start; + while (iter.hasNext()) { + list.add(iter.next() * pos); + pos += 1; + } + return list.iterator(); + } + }, false); + Assert.assertEquals(0, rddByIndex.first().intValue()); + } + + @Test public void map() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); From e2331ed31943a23edcfd166b22692c64d43ac93d Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 17 Feb 2014 00:13:37 -0800 Subject: [PATCH 02/23] Check all the values --- core/src/test/java/org/apache/spark/JavaAPISuite.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 401d5901280cb..804ec6eec2214 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -394,6 +394,8 @@ public Iterator call(Integer start, java.util.Iterator iter) { } }, false); Assert.assertEquals(0, rddByIndex.first().intValue()); + Integer[] values = {0, 2, 6, 12, 20}; + Assert.assertEquals(Arrays.asList(values), rddByIndex.collect()); } From 0d624bf306fb3b5a2409943c9a0345df080975ec Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 23 Feb 2014 17:02:32 -0800 Subject: [PATCH 03/23] Add missing class --- .../MapPartitionsWithIndexFunction.scala | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala diff --git a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala new file mode 100644 index 0000000000000..451890c29cfa6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java.function + +import scala.reflect.ClassTag + +/** + * A function that takes the a partition + */ +abstract class MapPartitionsWithIndexFunction[A, B] extends Function2[Integer, + java.util.Iterator[A], java.util.Iterator[B]] { + def elementType() : ClassTag[B] = ClassTag.Any.asInstanceOf[ClassTag[B]] +} From 4421ecc35a235aee8fa24797be9c10b9bd43163a Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 26 Feb 2014 19:40:48 -0800 Subject: [PATCH 04/23] Use fakeClassTag --- .../api/java/function/MapPartitionsWithIndexFunction.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala index 451890c29cfa6..61e0827bc6eaf 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala +++ b/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala @@ -19,10 +19,12 @@ package org.apache.spark.api.java.function import scala.reflect.ClassTag +import org.apache.spark.api.java.JavaSparkContext.fakeClassTag + /** * A function that takes the a partition */ abstract class MapPartitionsWithIndexFunction[A, B] extends Function2[Integer, java.util.Iterator[A], java.util.Iterator[B]] { - def elementType() : ClassTag[B] = ClassTag.Any.asInstanceOf[ClassTag[B]] + def elementType() : ClassTag[B] = fakeClassTag } From 8d849a129e3d6489d697fd24a6c1a3b410f8b3ce Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 16 Feb 2014 23:39:39 -0800 Subject: [PATCH 05/23] Fix Java API for mapPartitionsWithIndex --- .../apache/spark/api/java/JavaRDDLike.scala | 12 ++++++----- .../java/org/apache/spark/JavaAPISuite.java | 20 +++++++++++++++++++ 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index af0114bee3f49..c07d4b85b213b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -73,11 +73,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ - def mapPartitionsWithIndex[R: ClassTag]( - f: JFunction2[java.lang.Integer, java.util.Iterator[T], java.util.Iterator[R]], - preservesPartitioning: Boolean = false): JavaRDD[R] = - new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))), - preservesPartitioning)) + def mapPartitionsWithIndex[R](f: MapPartitionsWithIndexFunction[T, R], + preservesPartitioning: Boolean = false): JavaRDD[R] = { + import scala.collection.JavaConverters._ + def fn = (a: Int, b: Iterator[T]) => f.apply(a, asJavaIterator(b)).asScala + val newRdd = rdd.mapPartitionsWithIndex(fn, preservesPartitioning)(f.elementType()) + new JavaRDD(newRdd)(f.elementType()) + } /** * Return a new RDD by applying a function to all elements of this RDD. diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 40e853c39ca99..2dfa52a86a355 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -414,6 +414,26 @@ public void javaDoubleRDDHistoGram() { Assert.assertArrayEquals(expected_counts, histogram); } + @Test + public void mapPartitionsWithIndex() { + JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); + JavaRDD rddByIndex = + rdd.mapPartitionsWithIndex(new MapPartitionsWithIndexFunction() { + @Override + public Iterator call(Integer start, java.util.Iterator iter) { + List list = new ArrayList(); + int pos = start; + while (iter.hasNext()) { + list.add(iter.next() * pos); + pos += 1; + } + return list.iterator(); + } + }, false); + Assert.assertEquals(0, rddByIndex.first().intValue()); + } + + @Test public void map() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); From 958efa4d217220ce90a90d04fccefe193c949409 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 17 Feb 2014 00:13:37 -0800 Subject: [PATCH 06/23] Check all the values --- core/src/test/java/org/apache/spark/JavaAPISuite.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 2dfa52a86a355..f2fbd3e1c7044 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -431,6 +431,8 @@ public Iterator call(Integer start, java.util.Iterator iter) { } }, false); Assert.assertEquals(0, rddByIndex.first().intValue()); + Integer[] values = {0, 2, 6, 12, 20}; + Assert.assertEquals(Arrays.asList(values), rddByIndex.collect()); } From 6ad1a3cec0e5b87f6747f06bb0a78cff6fe13a6b Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 23 Feb 2014 17:02:32 -0800 Subject: [PATCH 07/23] Add missing class --- .../MapPartitionsWithIndexFunction.scala | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala diff --git a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala new file mode 100644 index 0000000000000..451890c29cfa6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java.function + +import scala.reflect.ClassTag + +/** + * A function that takes the a partition + */ +abstract class MapPartitionsWithIndexFunction[A, B] extends Function2[Integer, + java.util.Iterator[A], java.util.Iterator[B]] { + def elementType() : ClassTag[B] = ClassTag.Any.asInstanceOf[ClassTag[B]] +} From e64e1ad5181463b003e5f5b99911b2676b418f60 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 26 Feb 2014 19:40:48 -0800 Subject: [PATCH 08/23] Use fakeClassTag --- .../api/java/function/MapPartitionsWithIndexFunction.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala index 451890c29cfa6..61e0827bc6eaf 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala +++ b/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala @@ -19,10 +19,12 @@ package org.apache.spark.api.java.function import scala.reflect.ClassTag +import org.apache.spark.api.java.JavaSparkContext.fakeClassTag + /** * A function that takes the a partition */ abstract class MapPartitionsWithIndexFunction[A, B] extends Function2[Integer, java.util.Iterator[A], java.util.Iterator[B]] { - def elementType() : ClassTag[B] = ClassTag.Any.asInstanceOf[ClassTag[B]] + def elementType() : ClassTag[B] = fakeClassTag } From 8bfd3f3e2c3721bf192c196ea1a3a2d4a2d63a32 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 8 Mar 2014 13:28:13 -0800 Subject: [PATCH 09/23] It compiles with the Java 8 happy pandas --- .../function/MapPartitionsWithIndexFunction.java} | 11 ++++------- .../scala/org/apache/spark/api/java/JavaRDDLike.scala | 8 ++++---- 2 files changed, 8 insertions(+), 11 deletions(-) rename core/src/main/{scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala => java/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.java} (73%) diff --git a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.java similarity index 73% rename from core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala rename to core/src/main/java/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.java index 61e0827bc6eaf..83fd52be455c2 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala +++ b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.java @@ -15,16 +15,13 @@ * limitations under the License. */ -package org.apache.spark.api.java.function +package org.apache.spark.api.java.function; -import scala.reflect.ClassTag - -import org.apache.spark.api.java.JavaSparkContext.fakeClassTag +import java.io.Serializable; /** * A function that takes the a partition */ -abstract class MapPartitionsWithIndexFunction[A, B] extends Function2[Integer, - java.util.Iterator[A], java.util.Iterator[B]] { - def elementType() : ClassTag[B] = fakeClassTag +public interface MapPartitionsWithIndexFunction extends Function2, java.util.Iterator> { } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index c07d4b85b213b..1f3e1bab027ba 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -73,12 +73,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ - def mapPartitionsWithIndex[R](f: MapPartitionsWithIndexFunction[T, R], + def mapPartitionsWithIndex[R: ClassTag](f: MapPartitionsWithIndexFunction[T, R], preservesPartitioning: Boolean = false): JavaRDD[R] = { import scala.collection.JavaConverters._ - def fn = (a: Int, b: Iterator[T]) => f.apply(a, asJavaIterator(b)).asScala - val newRdd = rdd.mapPartitionsWithIndex(fn, preservesPartitioning)(f.elementType()) - new JavaRDD(newRdd)(f.elementType()) + def fn = (a: Int, b: Iterator[T]) => f.call(a, asJavaIterator(b)).asScala + val newRdd = rdd.mapPartitionsWithIndex(fn, preservesPartitioning) + new JavaRDD(newRdd)(fakeClassTag) } /** From 36c78315240d41e4f08c590026c8b7daad1aef01 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 8 Mar 2014 13:30:12 -0800 Subject: [PATCH 10/23] Remove old function --- .../MapPartitionsWithIndexFunction.scala | 30 ------------------- 1 file changed, 30 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala diff --git a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala deleted file mode 100644 index 61e0827bc6eaf..0000000000000 --- a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.api.java.function - -import scala.reflect.ClassTag - -import org.apache.spark.api.java.JavaSparkContext.fakeClassTag - -/** - * A function that takes the a partition - */ -abstract class MapPartitionsWithIndexFunction[A, B] extends Function2[Integer, - java.util.Iterator[A], java.util.Iterator[B]] { - def elementType() : ClassTag[B] = fakeClassTag -} From 79d1bc18b3024f66c25db897fe66556ded7af78b Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 17 Feb 2014 00:13:37 -0800 Subject: [PATCH 11/23] Check all the values --- .../java/org/apache/spark/JavaAPISuite.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 40e853c39ca99..f2fbd3e1c7044 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -414,6 +414,28 @@ public void javaDoubleRDDHistoGram() { Assert.assertArrayEquals(expected_counts, histogram); } + @Test + public void mapPartitionsWithIndex() { + JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); + JavaRDD rddByIndex = + rdd.mapPartitionsWithIndex(new MapPartitionsWithIndexFunction() { + @Override + public Iterator call(Integer start, java.util.Iterator iter) { + List list = new ArrayList(); + int pos = start; + while (iter.hasNext()) { + list.add(iter.next() * pos); + pos += 1; + } + return list.iterator(); + } + }, false); + Assert.assertEquals(0, rddByIndex.first().intValue()); + Integer[] values = {0, 2, 6, 12, 20}; + Assert.assertEquals(Arrays.asList(values), rddByIndex.collect()); + } + + @Test public void map() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); From ec80d7a5bfda45bc8b9d3d8283ca2c1db725021c Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 23 Feb 2014 17:02:32 -0800 Subject: [PATCH 12/23] Add missing class --- .../MapPartitionsWithIndexFunction.scala | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala diff --git a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala new file mode 100644 index 0000000000000..451890c29cfa6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java.function + +import scala.reflect.ClassTag + +/** + * A function that takes the a partition + */ +abstract class MapPartitionsWithIndexFunction[A, B] extends Function2[Integer, + java.util.Iterator[A], java.util.Iterator[B]] { + def elementType() : ClassTag[B] = ClassTag.Any.asInstanceOf[ClassTag[B]] +} From e4962abc2d3704adcec49d509236ce736a2328b7 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 26 Feb 2014 19:40:48 -0800 Subject: [PATCH 13/23] Use fakeClassTag --- .../api/java/function/MapPartitionsWithIndexFunction.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala index 451890c29cfa6..61e0827bc6eaf 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala +++ b/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala @@ -19,10 +19,12 @@ package org.apache.spark.api.java.function import scala.reflect.ClassTag +import org.apache.spark.api.java.JavaSparkContext.fakeClassTag + /** * A function that takes the a partition */ abstract class MapPartitionsWithIndexFunction[A, B] extends Function2[Integer, java.util.Iterator[A], java.util.Iterator[B]] { - def elementType() : ClassTag[B] = ClassTag.Any.asInstanceOf[ClassTag[B]] + def elementType() : ClassTag[B] = fakeClassTag } From f484afce126108375e1205f2b68f1577c4ef9598 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 16 Feb 2014 23:39:39 -0800 Subject: [PATCH 14/23] Fix Java API for mapPartitionsWithIndex --- .../org/apache/spark/api/java/JavaRDDLike.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index a89419bbd10e7..7c20c789f3b4c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -72,11 +72,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ - def mapPartitionsWithIndex[R: ClassTag]( - f: JFunction2[java.lang.Integer, java.util.Iterator[T], java.util.Iterator[R]], - preservesPartitioning: Boolean = false): JavaRDD[R] = - new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))), - preservesPartitioning)) + def mapPartitionsWithIndex[R](f: MapPartitionsWithIndexFunction[T, R], + preservesPartitioning: Boolean = false): JavaRDD[R] = { + import scala.collection.JavaConverters._ + def fn = (a: Int, b: Iterator[T]) => f.apply(a, asJavaIterator(b)).asScala + val newRdd = rdd.mapPartitionsWithIndex(fn, preservesPartitioning)(f.elementType()) + new JavaRDD(newRdd)(f.elementType()) + } /** * Return a new RDD by applying a function to all elements of this RDD. From 4eb9c0ff65fb0555519281274836f8b0944c8adc Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 23 Feb 2014 17:02:32 -0800 Subject: [PATCH 15/23] Add missing class --- .../api/java/function/MapPartitionsWithIndexFunction.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala index 61e0827bc6eaf..079bf9df78343 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala +++ b/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala @@ -22,7 +22,7 @@ import scala.reflect.ClassTag import org.apache.spark.api.java.JavaSparkContext.fakeClassTag /** - * A function that takes the a partition + * A function that takes the a partition */ abstract class MapPartitionsWithIndexFunction[A, B] extends Function2[Integer, java.util.Iterator[A], java.util.Iterator[B]] { From 96a86c7aa3d4d67ee0fce9ca6bafb2ab1d3465a3 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 8 Mar 2014 13:28:13 -0800 Subject: [PATCH 16/23] It compiles with the Java 8 happy pandas --- .../function/MapPartitionsWithIndexFunction.java} | 11 ++++------- .../scala/org/apache/spark/api/java/JavaRDDLike.scala | 8 ++++---- 2 files changed, 8 insertions(+), 11 deletions(-) rename core/src/main/{scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala => java/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.java} (73%) diff --git a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.java similarity index 73% rename from core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala rename to core/src/main/java/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.java index 079bf9df78343..7cb79707c37f5 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.scala +++ b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.java @@ -15,16 +15,13 @@ * limitations under the License. */ -package org.apache.spark.api.java.function +package org.apache.spark.api.java.function; -import scala.reflect.ClassTag - -import org.apache.spark.api.java.JavaSparkContext.fakeClassTag +import java.io.Serializable; /** * A function that takes the a partition */ -abstract class MapPartitionsWithIndexFunction[A, B] extends Function2[Integer, - java.util.Iterator[A], java.util.Iterator[B]] { - def elementType() : ClassTag[B] = fakeClassTag +public interface MapPartitionsWithIndexFunction extends Function2, java.util.Iterator> { } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 7c20c789f3b4c..8a67a787720b0 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -72,12 +72,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ - def mapPartitionsWithIndex[R](f: MapPartitionsWithIndexFunction[T, R], + def mapPartitionsWithIndex[R: ClassTag](f: MapPartitionsWithIndexFunction[T, R], preservesPartitioning: Boolean = false): JavaRDD[R] = { import scala.collection.JavaConverters._ - def fn = (a: Int, b: Iterator[T]) => f.apply(a, asJavaIterator(b)).asScala - val newRdd = rdd.mapPartitionsWithIndex(fn, preservesPartitioning)(f.elementType()) - new JavaRDD(newRdd)(f.elementType()) + def fn = (a: Int, b: Iterator[T]) => f.call(a, asJavaIterator(b)).asScala + val newRdd = rdd.mapPartitionsWithIndex(fn, preservesPartitioning) + new JavaRDD(newRdd)(fakeClassTag) } /** From df6922a8cb9d0d0a938f265a49f990b8373c9122 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 8 Mar 2014 13:30:12 -0800 Subject: [PATCH 17/23] Remove old function --- .../dstream/PluggableInputDStream.scala | 31 ------------------- 1 file changed, 31 deletions(-) delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala deleted file mode 100644 index 6f9477020a459..0000000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import org.apache.spark.streaming.StreamingContext -import scala.reflect.ClassTag - -private[streaming] -class PluggableInputDStream[T: ClassTag]( - @transient ssc_ : StreamingContext, - receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) { - - def getReceiver(): NetworkReceiver[T] = { - receiver - } -} From 4fa42b06dd22f969bac16a61f6029682fe833ee5 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 11 Mar 2014 14:04:35 -0700 Subject: [PATCH 18/23] Revert "Remove old function (accidently removed a file)" This reverts commit df6922a8cb9d0d0a938f265a49f990b8373c9122. --- .../dstream/PluggableInputDStream.scala | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala new file mode 100644 index 0000000000000..6f9477020a459 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.StreamingContext +import scala.reflect.ClassTag + +private[streaming] +class PluggableInputDStream[T: ClassTag]( + @transient ssc_ : StreamingContext, + receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) { + + def getReceiver(): NetworkReceiver[T] = { + receiver + } +} From 2aa2fc98efc7f60fde8d6313a8e0915da84602e9 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 11 Mar 2014 15:35:55 -0700 Subject: [PATCH 19/23] Make it work with the new Java API style --- .../apache/spark/api/java/JavaRDDLike.scala | 5 ++-- .../java/org/apache/spark/JavaAPISuite.java | 24 +++++++++---------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 8a67a787720b0..4f6e1dc8a2132 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -72,11 +72,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ - def mapPartitionsWithIndex[R: ClassTag](f: MapPartitionsWithIndexFunction[T, R], - preservesPartitioning: Boolean = false): JavaRDD[R] = { + def mapPartitionsWithIndex[R](f: MapPartitionsWithIndexFunction[T, R]): JavaRDD[R] = { import scala.collection.JavaConverters._ def fn = (a: Int, b: Iterator[T]) => f.call(a, asJavaIterator(b)).asScala - val newRdd = rdd.mapPartitionsWithIndex(fn, preservesPartitioning) + val newRdd = rdd.mapPartitionsWithIndex(fn)(fakeClassTag) new JavaRDD(newRdd)(fakeClassTag) } diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index f2fbd3e1c7044..8f7af4a1f4190 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -418,18 +418,18 @@ public void javaDoubleRDDHistoGram() { public void mapPartitionsWithIndex() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); JavaRDD rddByIndex = - rdd.mapPartitionsWithIndex(new MapPartitionsWithIndexFunction() { - @Override - public Iterator call(Integer start, java.util.Iterator iter) { - List list = new ArrayList(); - int pos = start; - while (iter.hasNext()) { - list.add(iter.next() * pos); - pos += 1; - } - return list.iterator(); - } - }, false); + rdd.mapPartitionsWithIndex(new MapPartitionsWithIndexFunction() { + @Override + public Iterator call(Integer start, java.util.Iterator iter) { + List list = new ArrayList(); + int pos = start; + while (iter.hasNext()) { + list.add(iter.next() * pos); + pos += 1; + } + return list.iterator(); + } + }); Assert.assertEquals(0, rddByIndex.first().intValue()); Integer[] values = {0, 2, 6, 12, 20}; Assert.assertEquals(Arrays.asList(values), rddByIndex.collect()); From cc4050d86032d6da2f9662bb8a2595567a994210 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 15 Mar 2014 15:35:22 -0700 Subject: [PATCH 20/23] Add Java 8 API test --- .../java/org/apache/spark/Java8APISuite.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java index f67251217ed4a..2687f3a13726c 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java @@ -245,6 +245,24 @@ public void mapPartitions() { Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); } + @Test + public void mapPartitionsWithIndex() { + JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); + JavaRDD partitionSums = rdd.mapPartitionsWithIndex((start, iter) -> { + List list = new ArrayList(); + int sum = 0; + int pos = start; + while (iter.hasNext()) { + sum += (pos* iter.next()); + pos += 1; + } + return list.iterator(); + }); + Assert.assertEquals(0, rddByIndex.first().intValue()); + Integer[] values = {0, 2, 6, 12, 20}; + Assert.assertEquals(Arrays.asList(values), rddByIndex.collect()); + } + @Test public void sequenceFile() { File tempDir = Files.createTempDir(); From 780120df5509341ad4b8b837b05bf0de71b84a51 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 7 Apr 2014 23:19:39 -0700 Subject: [PATCH 21/23] Remove MappartitionsWithIndeFunction --- .../MapPartitionsWithIndexFunction.java | 27 ------------------- 1 file changed, 27 deletions(-) delete mode 100644 core/src/main/java/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.java deleted file mode 100644 index 7cb79707c37f5..0000000000000 --- a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsWithIndexFunction.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.api.java.function; - -import java.io.Serializable; - -/** - * A function that takes the a partition - */ -public interface MapPartitionsWithIndexFunction extends Function2, java.util.Iterator> { -} From a5cd2fa619254e0ca5af7aa333317cd65221c1a0 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 8 Apr 2014 00:20:50 -0700 Subject: [PATCH 22/23] Fix Java API --- .../main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 8a67a787720b0..5d156db8d6990 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -17,7 +17,7 @@ package org.apache.spark.api.java -import java.util.{Comparator, List => JList} +import java.util.{Comparator, List => JList, Iterator => JIterator} import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -72,7 +72,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ - def mapPartitionsWithIndex[R: ClassTag](f: MapPartitionsWithIndexFunction[T, R], + def mapPartitionsWithIndex[R: ClassTag](f: JFunction2[Integer, JIterator[T], JIterator[R]], preservesPartitioning: Boolean = false): JavaRDD[R] = { import scala.collection.JavaConverters._ def fn = (a: Int, b: Iterator[T]) => f.call(a, asJavaIterator(b)).asScala From b231375c92ecad85ed084e8a5a865c8eb495a4ee Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 8 Apr 2014 00:26:11 -0700 Subject: [PATCH 23/23] Fix the classtag bits --- .../main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 5d156db8d6990..53da0fced0a10 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -72,11 +72,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ - def mapPartitionsWithIndex[R: ClassTag](f: JFunction2[Integer, JIterator[T], JIterator[R]], + def mapPartitionsWithIndex[R](f: JFunction2[Integer, JIterator[T], JIterator[R]], preservesPartitioning: Boolean = false): JavaRDD[R] = { import scala.collection.JavaConverters._ def fn = (a: Int, b: Iterator[T]) => f.call(a, asJavaIterator(b)).asScala - val newRdd = rdd.mapPartitionsWithIndex(fn, preservesPartitioning) + val newRdd = rdd.mapPartitionsWithIndex(fn, preservesPartitioning)(fakeClassTag[R]) new JavaRDD(newRdd)(fakeClassTag) }