From 7040dc9f45eae56cb706cb44cd48bea16217db1e Mon Sep 17 00:00:00 2001 From: Wesley Tang Date: Sat, 23 Jul 2016 12:35:48 +0800 Subject: [PATCH 1/4] [SPARK-16664][SQL] Fix persist call on Data frames with more than 200 columns is wiping out the data. --- .../spark/sql/execution/columnar/GenerateColumnAccessor.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index 7a14879b8b9df..96bd338f092e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -127,7 +127,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold) val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold) var groupedAccessorsLength = 0 - groupedAccessorsItr.zipWithIndex.map { case (body, i) => + groupedAccessorsItr.zipWithIndex.foreach { case (body, i) => groupedAccessorsLength += 1 val funcName = s"accessors$i" val funcCode = s""" @@ -137,7 +137,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera """.stripMargin ctx.addNewFunction(funcName, funcCode) } - groupedExtractorsItr.zipWithIndex.map { case (body, i) => + groupedExtractorsItr.zipWithIndex.foreach { case (body, i) => val funcName = s"extractors$i" val funcCode = s""" |private void $funcName() { From 42b5c1118fd83fc1eee67e6caaa7891921315b87 Mon Sep 17 00:00:00 2001 From: Wesley Tang Date: Sat, 23 Jul 2016 14:03:30 +0800 Subject: [PATCH 2/4] [SPARK-16664] Add test --- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 905da554f1cf1..a44179c44b941 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1571,4 +1571,12 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(joined, Row("x", null, null)) checkAnswer(joined.filter($"new".isNull), Row("x", null, null)) } + + test("SPARK-16664: persist with more than 200 columns") { + val size = 201l + val rdd = sparkContext.makeRDD(Seq(Row.fromSeq(Seq.range(0, size)))) + val schemas = List.range(0, size).map(a => StructField("name" + a, LongType, true)) + val df = spark.createDataFrame(rdd, StructType(schemas), false) + assert(df.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100) + } } From 0d6c29b96d3d270880b3d0913e2df7d1f3913441 Mon Sep 17 00:00:00 2001 From: Wesley Tang Date: Sat, 23 Jul 2016 23:02:45 +0800 Subject: [PATCH 3/4] Update tests --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +- .../sql/execution/columnar/InMemoryColumnarQuerySuite.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index a44179c44b941..62cfd24041b3d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1573,7 +1573,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("SPARK-16664: persist with more than 200 columns") { - val size = 201l + val size = 201L val rdd = sparkContext.makeRDD(Seq(Row.fromSeq(Seq.range(0, size)))) val schemas = List.range(0, size).map(a => StructField("name" + a, LongType, true)) val df = spark.createDataFrame(rdd, StructType(schemas), false) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index af3ed14c122d2..8b0cf2de84db1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -227,7 +227,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { val columnTypes1 = List.fill(length1)(IntegerType) val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1) - val length2 = 10000 + //SPARK-16664: the limit of janino is 8117 + val length2 = 8117 val columnTypes2 = List.fill(length2)(IntegerType) val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2) } From b3f60fad6e80f5cb9f73de2ccb9ccafe14f3c4c1 Mon Sep 17 00:00:00 2001 From: Wesley Tang Date: Tue, 26 Jul 2016 17:45:51 +0800 Subject: [PATCH 4/4] Fix style --- .../sql/execution/columnar/InMemoryColumnarQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 8b0cf2de84db1..937839644ad5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -227,7 +227,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { val columnTypes1 = List.fill(length1)(IntegerType) val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1) - //SPARK-16664: the limit of janino is 8117 + // SPARK-16664: the limit of janino is 8117 val length2 = 8117 val columnTypes2 = List.fill(length2)(IntegerType) val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)