From 4eb04e971244d2c49085e17ae4685a31e6808066 Mon Sep 17 00:00:00 2001 From: Anselme Vignon Date: Wed, 18 Feb 2015 10:51:52 +0100 Subject: [PATCH 01/11] bugfix SPARK-5775 --- .../sql/parquet/ParquetTableOperations.scala | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 072a4bcc42ee4..4adee1175c9ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -144,19 +144,30 @@ case class ParquetTableScan( new Iterator[Row] { def hasNext = iter.hasNext def next() = { - val row = iter.next()._2.asInstanceOf[SpecificMutableRow] + iter.next()._2 match { + case row: SpecificMutableRow => { + //val row = iter.next ()._2.asInstanceOf[SpecificMutableRow] - // Parquet will leave partitioning columns empty, so we fill them in here. + // Parquet will leave partitioning columns empty, so we fill them in here. var i = 0 while (i < requestedPartitionOrdinals.size) { - row(requestedPartitionOrdinals(i)._2) = - partitionRowValues(requestedPartitionOrdinals(i)._1) - i += 1 + row (requestedPartitionOrdinals (i)._2) = + partitionRowValues (requestedPartitionOrdinals (i)._1) + i += 1 } row + } + case row : Row => { + val rVals = row.to[Array] + var i = 0 + while (i < requestedPartitionOrdinals.size) { + rVals.update(requestedPartitionOrdinals (i)._2,partitionRowValues (requestedPartitionOrdinals (i)._1)) + } + Row.fromSeq(rVals) + } } } - } + }} } else { baseRDD.map(_._2) } From dbceaa308921f298b3cd9cc98fae66e1271c7f1c Mon Sep 17 00:00:00 2001 From: Anselme Vignon Date: Wed, 18 Feb 2015 12:17:38 +0100 Subject: [PATCH 02/11] cutting lines --- .../apache/spark/sql/parquet/ParquetTableOperations.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 4adee1175c9ee..756aeb21db4fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -161,7 +161,10 @@ case class ParquetTableScan( val rVals = row.to[Array] var i = 0 while (i < requestedPartitionOrdinals.size) { - rVals.update(requestedPartitionOrdinals (i)._2,partitionRowValues (requestedPartitionOrdinals (i)._1)) + rVals + .update( + requestedPartitionOrdinals (i)._2, + partitionRowValues (requestedPartitionOrdinals (i)._1)) } Row.fromSeq(rVals) } From f876dea96d50f9df0c4d9992e82b00d3a4a7968f Mon Sep 17 00:00:00 2001 From: Anselme Vignon Date: Wed, 18 Feb 2015 12:17:55 +0100 Subject: [PATCH 03/11] starting to write tests --- .../spark/sql/parquet/parquetSuites.scala | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 06fe144666a9c..c3368f1b94ff7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.hive.test.TestHive._ case class ParquetData(intField: Int, stringField: String) // The data that also includes the partitioning key case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) - +case class ParquetDataWithKeyAndComplexTypes(p: Int, intField: Int, stringField: String) /** * A suite to test the automatic conversion of metastore tables with parquet data to use the @@ -69,6 +69,22 @@ class ParquetMetastoreSuite extends ParquetTest { location '${partitionedTableDirWithKey.getCanonicalPath}' """) + sql(s""" + create external table partitioned_parquet_with_key_and_complextypes + ( + intField INT, + structField STRUCT, + arrayField ARRAY, + stringField STRING + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${partitionedTableDirWithKey.getCanonicalPath}' + """) + sql(s""" create external table normal_parquet ( From ae48f7c98410d320b128ed23fb5c6cdbcb8b504c Mon Sep 17 00:00:00 2001 From: Anselme Vignon Date: Thu, 19 Feb 2015 19:08:48 +0100 Subject: [PATCH 04/11] unittesting SPARK-5775 --- .../spark/sql/parquet/parquetSuites.scala | 97 +++++++++++++++++-- 1 file changed, 91 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index c3368f1b94ff7..b4ff2584e8a02 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -31,7 +31,9 @@ import org.apache.spark.sql.hive.test.TestHive._ case class ParquetData(intField: Int, stringField: String) // The data that also includes the partitioning key case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) -case class ParquetDataWithKeyAndComplexTypes(p: Int, intField: Int, stringField: String) +case class StructContainer(intStructField :Int, stringStructField: String ) +case class ParquetDataWithComplexTypes(intField :Int, stringField: String ,structField: StructContainer, arrayField: Seq[Int]) +case class ParquetDataWithKeyAndComplexTypes(p: Int,intField :Int, stringField: String , structField: StructContainer, arrayField: Seq[Int]) /** * A suite to test the automatic conversion of metastore tables with parquet data to use the @@ -69,20 +71,36 @@ class ParquetMetastoreSuite extends ParquetTest { location '${partitionedTableDirWithKey.getCanonicalPath}' """) + sql(s""" + create external table partitioned_parquet_with_complextypes + ( + intField INT, + stringField STRING, + structField STRUCT, + arrayField ARRAY + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${partitionedTableDirWithComplexTypes.getCanonicalPath}' + """) + sql(s""" create external table partitioned_parquet_with_key_and_complextypes ( intField INT, - structField STRUCT, - arrayField ARRAY, - stringField STRING + stringField STRING, + structField STRUCT, + arrayField ARRAY ) PARTITIONED BY (p int) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - location '${partitionedTableDirWithKey.getCanonicalPath}' + location '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}' """) sql(s""" @@ -106,6 +124,14 @@ class ParquetMetastoreSuite extends ParquetTest { sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)") } + (1 to 10).foreach { p => + sql(s"ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD PARTITION (p=$p)") + } + + (1 to 10).foreach { p => + sql(s"ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION (p=$p)") + } + setConf("spark.sql.hive.convertMetastoreParquet", "true") } @@ -155,6 +181,22 @@ class ParquetSourceSuite extends ParquetTest { path '${new File(partitionedTableDir, "p=1").getCanonicalPath}' ) """) + + sql( s""" + create temporary table partitioned_parquet_with_key_and_complextypes + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${new File(partitionedTableDirWithKeyAndComplexTypes, "p=1").getCanonicalPath}' + ) + """) + + sql( s""" + create temporary table partitioned_parquet_with_complextypes + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${new File(partitionedTableDirWithComplexTypes, "p=1").getCanonicalPath}' + ) + """) } } @@ -164,6 +206,8 @@ class ParquetSourceSuite extends ParquetTest { abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { var partitionedTableDir: File = null var partitionedTableDirWithKey: File = null + var partitionedTableDirWithKeyAndComplexTypes: File = null + var partitionedTableDirWithComplexTypes: File = null override def beforeAll(): Unit = { partitionedTableDir = File.createTempFile("parquettests", "sparksql") @@ -187,9 +231,32 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { .map(i => ParquetDataWithKey(p, i, s"part-$p")) .saveAsParquetFile(partDir.getCanonicalPath) } + + partitionedTableDirWithKeyAndComplexTypes = File.createTempFile("parquettests", "sparksql") + partitionedTableDirWithKeyAndComplexTypes.delete() + partitionedTableDirWithKeyAndComplexTypes.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => ParquetDataWithKeyAndComplexTypes(p, i,s"part-$p", StructContainer(i,f"${i}_string"), (1 to i))) + .saveAsParquetFile(partDir.getCanonicalPath) + } + + partitionedTableDirWithComplexTypes = File.createTempFile("parquettests", "sparksql") + partitionedTableDirWithComplexTypes.delete() + partitionedTableDirWithComplexTypes.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => ParquetDataWithComplexTypes(i,s"part-$p", StructContainer(i,f"${i}_string"), (1 to i))) + .saveAsParquetFile(partDir.getCanonicalPath) + } + } - Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table => + Seq("partitioned_parquet", "partitioned_parquet_with_key", "partitioned_parquet_with_key_and_complextypes","partitioned_parquet_with_complextypes").foreach { table => test(s"ordering of the partitioning columns $table") { checkAnswer( sql(s"SELECT p, stringField FROM $table WHERE p = 1"), @@ -202,6 +269,8 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { ) } + + test(s"project the partitioning column $table") { checkAnswer( sql(s"SELECT p, count(*) FROM $table group by p"), @@ -279,6 +348,22 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { } } + Seq("partitioned_parquet_with_key_and_complextypes", "partitioned_parquet_with_complextypes").foreach { table => + test(s"SPARK-5775 read struct from $table") { + checkAnswer( + sql(s"SELECT p, structField.intStructField , structField.stringStructField FROM $table WHERE p = 1"), + (1 to 10).map { i => ((1, i, f"${i}_string"))} + ) + } + + test (s"SPARK-5775 read array from $table") { + checkAnswer( + sql(s"SELECT arrayField, p FROM $table WHERE p = 1"), + (1 to 10).map { i => ((1 to i,1))} + ) + } + } + test("non-part select(*)") { checkAnswer( sql("SELECT COUNT(*) FROM normal_parquet"), From 22cec5206091580e9922f997ef8052ded393d225 Mon Sep 17 00:00:00 2001 From: Anselme Vignon Date: Thu, 19 Feb 2015 19:18:02 +0100 Subject: [PATCH 05/11] lint compatible changes --- .../org/apache/spark/sql/parquet/ParquetTableOperations.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 756aeb21db4fb..c7886cb1bafb2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -146,7 +146,6 @@ case class ParquetTableScan( def next() = { iter.next()._2 match { case row: SpecificMutableRow => { - //val row = iter.next ()._2.asInstanceOf[SpecificMutableRow] // Parquet will leave partitioning columns empty, so we fill them in here. var i = 0 From 005a7f852fd4e55857b619f40effcda9d209f6fb Mon Sep 17 00:00:00 2001 From: Anselme Vignon Date: Fri, 20 Feb 2015 12:47:33 +0100 Subject: [PATCH 06/11] added test cleanup --- .../org/apache/spark/sql/parquet/parquetSuites.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index b4ff2584e8a02..4509659ad944d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -256,6 +256,14 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { } + override def afterAll(): Unit = { + //delete temporary files + partitionedTableDir.delete() + partitionedTableDirWithKey.delete() + partitionedTableDirWithKeyAndComplexTypes.delete() + partitionedTableDirWithComplexTypes.delete() + } + Seq("partitioned_parquet", "partitioned_parquet_with_key", "partitioned_parquet_with_key_and_complextypes","partitioned_parquet_with_complextypes").foreach { table => test(s"ordering of the partitioning columns $table") { checkAnswer( From 7c829cbcef3b864f32dbef69ab558b39d6ab9a09 Mon Sep 17 00:00:00 2001 From: Anselme Vignon Date: Fri, 20 Feb 2015 12:48:27 +0100 Subject: [PATCH 07/11] bugfix, hopefully correct this time --- .../apache/spark/sql/parquet/ParquetTableOperations.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index c7886cb1bafb2..4c1be164618cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -150,9 +150,9 @@ case class ParquetTableScan( // Parquet will leave partitioning columns empty, so we fill them in here. var i = 0 while (i < requestedPartitionOrdinals.size) { - row (requestedPartitionOrdinals (i)._2) = - partitionRowValues (requestedPartitionOrdinals (i)._1) - i += 1 + row (requestedPartitionOrdinals (i)._2) = + partitionRowValues (requestedPartitionOrdinals (i)._1) + i += 1 } row } @@ -164,6 +164,7 @@ case class ParquetTableScan( .update( requestedPartitionOrdinals (i)._2, partitionRowValues (requestedPartitionOrdinals (i)._1)) + i += 1 } Row.fromSeq(rVals) } From 24928ea91234901b36f09ac88413f81ae64974c9 Mon Sep 17 00:00:00 2001 From: Anselme Vignon Date: Fri, 20 Feb 2015 14:53:02 +0100 Subject: [PATCH 08/11] corrected mirror bug (see SPARK-5775) for newParquet --- .../apache/spark/sql/parquet/newParquet.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 2e0c6c51c00e5..f3488091334de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -278,10 +278,18 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) }.toMap val currentValue = partValues.values.head.toInt - iter.map { pair => - val res = pair._2.asInstanceOf[SpecificMutableRow] - res.setInt(partitionKeyLocation, currentValue) - res + iter.map { _._2 match { + case row: SpecificMutableRow => { + val res = row.asInstanceOf[SpecificMutableRow] + res.setInt(partitionKeyLocation, currentValue) + res + } + case row: Row => { + val rowContent = row.to[Array] + rowContent.update(partitionKeyLocation, currentValue) + Row.fromSeq(rowContent) + } + } } } } else { From 8fc6a8ccdf0f232ecfdf1916111b538e1fb6bfab Mon Sep 17 00:00:00 2001 From: Anselme Vignon Date: Fri, 20 Feb 2015 14:53:47 +0100 Subject: [PATCH 09/11] correcting tests on temporary tables --- .../scala/org/apache/spark/sql/parquet/parquetSuites.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 4509659ad944d..0a68b8cefd28b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -186,7 +186,7 @@ class ParquetSourceSuite extends ParquetTest { create temporary table partitioned_parquet_with_key_and_complextypes USING org.apache.spark.sql.parquet OPTIONS ( - path '${new File(partitionedTableDirWithKeyAndComplexTypes, "p=1").getCanonicalPath}' + path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}' ) """) @@ -194,7 +194,7 @@ class ParquetSourceSuite extends ParquetTest { create temporary table partitioned_parquet_with_complextypes USING org.apache.spark.sql.parquet OPTIONS ( - path '${new File(partitionedTableDirWithComplexTypes, "p=1").getCanonicalPath}' + path '${partitionedTableDirWithComplexTypes.getCanonicalPath}' ) """) } From 52f73fca6e99baa7777c5402e6cdeefb68958464 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 28 Feb 2015 21:15:43 +0800 Subject: [PATCH 10/11] cherry-pick & merge from aa39460d4bb4c41084d350ccb1c5a56cd61239b7 --- .../sql/parquet/ParquetTableOperations.scala | 72 ++++++++++------ .../apache/spark/sql/parquet/newParquet.scala | 51 +++++++----- .../spark/sql/parquet/parquetSuites.scala | 82 ++++++++++++++----- 3 files changed, 139 insertions(+), 66 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 4c1be164618cf..3bd086da9d251 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -124,6 +124,13 @@ case class ParquetTableScan( conf) if (requestedPartitionOrdinals.nonEmpty) { + // This check is based on CatalystConverter.createRootConverter. + val primitiveRow = output.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType)) + + // Uses temporary variable to avoid the whole `ParquetTableScan` object being captured into + // the `mapPartitionsWithInputSplit` closure below. + val outputSize = output.size + baseRDD.mapPartitionsWithInputSplit { case (split, iter) => val partValue = "([^=]+)=([^=]+)".r val partValues = @@ -141,36 +148,49 @@ case class ParquetTableScan( relation.partitioningAttributes .map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow)) - new Iterator[Row] { - def hasNext = iter.hasNext - def next() = { - iter.next()._2 match { - case row: SpecificMutableRow => { - - // Parquet will leave partitioning columns empty, so we fill them in here. - var i = 0 - while (i < requestedPartitionOrdinals.size) { - row (requestedPartitionOrdinals (i)._2) = - partitionRowValues (requestedPartitionOrdinals (i)._1) - i += 1 - } - row + // Parquet will leave partitioning columns empty, so we fill them in here. + if (primitiveRow) { + new Iterator[Row] { + def hasNext = iter.hasNext + def next() = { + // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow. + val row = iter.next()._2.asInstanceOf[SpecificMutableRow] + + var i = 0 + while (i < requestedPartitionOrdinals.size) { + row(requestedPartitionOrdinals(i)._2) = + partitionRowValues(requestedPartitionOrdinals(i)._1) + i += 1 + } + row } - case row : Row => { - val rVals = row.to[Array] - var i = 0 - while (i < requestedPartitionOrdinals.size) { - rVals - .update( - requestedPartitionOrdinals (i)._2, - partitionRowValues (requestedPartitionOrdinals (i)._1)) - i += 1 - } - Row.fromSeq(rVals) + } + } else { + // Create a mutable row since we need to fill in values from partition columns. + val mutableRow = new GenericMutableRow(outputSize) + new Iterator[Row] { + def hasNext = iter.hasNext + def next() = { + // We are using CatalystGroupConverter and it returns a GenericRow. + // Since GenericRow is not mutable, we just cast it to a Row. + val row = iter.next()._2.asInstanceOf[Row] + + var i = 0 + while (i < row.size) { + mutableRow(i) = row(i) + i += 1 + } + i = 0 + while (i < requestedPartitionOrdinals.size) { + mutableRow(requestedPartitionOrdinals(i)._2) = + partitionRowValues(requestedPartitionOrdinals(i)._1) + i += 1 } + mutableRow + } } } - }} + } } else { baseRDD.map(_._2) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index f3488091334de..a7506c8c32af1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -22,9 +22,8 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job} -import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate -import parquet.hadoop.ParquetInputFormat +import parquet.hadoop.{ParquetInputSplit, ParquetInputFormat} import parquet.hadoop.util.ContextUtil import org.apache.spark.annotation.DeveloperApi @@ -40,7 +39,7 @@ import scala.collection.JavaConversions._ /** * Allows creation of parquet based tables using the syntax - * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option + * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option * required is `path`, which should be the location of a collection of, optionally partitioned, * parquet files. */ @@ -265,7 +264,10 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) // When the data does not include the key and the key is requested then we must fill it in // based on information from the input split. if (!dataIncludesKey && partitionKeyLocation != -1) { - baseRDD.mapPartitionsWithInputSplit { case (split, iter) => + val primitiveRow = + requestedSchema.toAttributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType)) + + baseRDD.mapPartitionsWithInputSplit { case (split, iterator) => val partValue = "([^=]+)=([^=]+)".r val partValues = split.asInstanceOf[parquet.hadoop.ParquetInputSplit] @@ -273,24 +275,35 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) .toString .split("/") .flatMap { - case partValue(key, value) => Some(key -> value) - case _ => None - }.toMap - - val currentValue = partValues.values.head.toInt - iter.map { _._2 match { - case row: SpecificMutableRow => { - val res = row.asInstanceOf[SpecificMutableRow] - res.setInt(partitionKeyLocation, currentValue) - res + case partValue(key, value) => Some(key -> value) + case _ => None } + .toMap + + if (primitiveRow) { + iterator.map { pair => + // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow. + val mutableRow = pair._2.asInstanceOf[SpecificMutableRow] + var i = 0 + mutableRow.update(partitionKeyLocation, partValues.values.head.toInt) + mutableRow } - case row: Row => { - val rowContent = row.to[Array] - rowContent.update(partitionKeyLocation, currentValue) - Row.fromSeq(rowContent) + } else { + // Create a mutable row since we need to fill in values from partition columns. + val mutableRow = new GenericMutableRow(requestedSchema.toAttributes.size) + + iterator.map { pair => + // We are using CatalystGroupConverter and it returns a GenericRow. + // Since GenericRow is not mutable, we just cast it to a Row. + val row = pair._2.asInstanceOf[Row] + var i = 0 + while (i < row.size) { + mutableRow(i) = row(i) + i += 1 + } + mutableRow.update(partitionKeyLocation, partValues.values.head.toInt) + mutableRow } } - } } } else { baseRDD.map(_._2) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 0a68b8cefd28b..854dede799aff 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -30,10 +30,21 @@ import org.apache.spark.sql.hive.test.TestHive._ // The data where the partitioning key exists only in the directory structure. case class ParquetData(intField: Int, stringField: String) // The data that also includes the partitioning key -case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) -case class StructContainer(intStructField :Int, stringStructField: String ) -case class ParquetDataWithComplexTypes(intField :Int, stringField: String ,structField: StructContainer, arrayField: Seq[Int]) -case class ParquetDataWithKeyAndComplexTypes(p: Int,intField :Int, stringField: String , structField: StructContainer, arrayField: Seq[Int]) +case class ParquetDataWithKey(p:Int, intField: Int, stringField: String) +case class StructContainer(intStructField :Int, stringStructField: String) + +case class ParquetDataWithComplexTypes( + intField: Int, + stringField: String, + structField: StructContainer, + arrayField: Seq[Int]) + +case class ParquetDataWithKeyAndComplexTypes( + p: Int, + intField: Int, + stringField: String, + structField: StructContainer, + arrayField: Seq[Int]) /** * A suite to test the automatic conversion of metastore tables with parquet data to use the @@ -136,6 +147,12 @@ class ParquetMetastoreSuite extends ParquetTest { } override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS partitioned_parquet") + sql("DROP TABLE IF EXISTS partitioned_parquet_with_key") + sql("DROP TABLE IF EXISTS partitioned_parquet_with_complextypes") + sql("DROP TABLE IF EXISTS partitioned_parquet_with_key_and_complextypes") + sql("DROP TABLE IF EXISTS normal_parquet") + setConf("spark.sql.hive.convertMetastoreParquet", "false") } @@ -183,7 +200,7 @@ class ParquetSourceSuite extends ParquetTest { """) sql( s""" - create temporary table partitioned_parquet_with_key_and_complextypes + CREATE TEMPORARY TABLE partitioned_parquet_with_key_and_complextypes USING org.apache.spark.sql.parquet OPTIONS ( path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}' @@ -191,7 +208,7 @@ class ParquetSourceSuite extends ParquetTest { """) sql( s""" - create temporary table partitioned_parquet_with_complextypes + CREATE TEMPORARY TABLE partitioned_parquet_with_complextypes USING org.apache.spark.sql.parquet OPTIONS ( path '${partitionedTableDirWithComplexTypes.getCanonicalPath}' @@ -205,6 +222,7 @@ class ParquetSourceSuite extends ParquetTest { */ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { var partitionedTableDir: File = null + var normalTableDir: File = null var partitionedTableDirWithKey: File = null var partitionedTableDirWithKeyAndComplexTypes: File = null var partitionedTableDirWithComplexTypes: File = null @@ -221,6 +239,15 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { .saveAsParquetFile(partDir.getCanonicalPath) } + normalTableDir = File.createTempFile("parquettests", "sparksql") + normalTableDir.delete() + normalTableDir.mkdir() + + sparkContext + .makeRDD(1 to 10) + .map(i => ParquetData(i, s"part-1")) + .saveAsParquetFile(new File(normalTableDir, "normal").getCanonicalPath) + partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql") partitionedTableDirWithKey.delete() partitionedTableDirWithKey.mkdir() @@ -239,7 +266,8 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { (1 to 10).foreach { p => val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p") sparkContext.makeRDD(1 to 10) - .map(i => ParquetDataWithKeyAndComplexTypes(p, i,s"part-$p", StructContainer(i,f"${i}_string"), (1 to i))) + .map(i => ParquetDataWithKeyAndComplexTypes( + p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)) .saveAsParquetFile(partDir.getCanonicalPath) } @@ -250,21 +278,26 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { (1 to 10).foreach { p => val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p") sparkContext.makeRDD(1 to 10) - .map(i => ParquetDataWithComplexTypes(i,s"part-$p", StructContainer(i,f"${i}_string"), (1 to i))) + .map(i => ParquetDataWithComplexTypes( + i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)) .saveAsParquetFile(partDir.getCanonicalPath) } - } - override def afterAll(): Unit = { + override protected def afterAll(): Unit = { //delete temporary files partitionedTableDir.delete() + normalTableDir.delete() partitionedTableDirWithKey.delete() partitionedTableDirWithKeyAndComplexTypes.delete() partitionedTableDirWithComplexTypes.delete() } - Seq("partitioned_parquet", "partitioned_parquet_with_key", "partitioned_parquet_with_key_and_complextypes","partitioned_parquet_with_complextypes").foreach { table => + Seq( + "partitioned_parquet", + "partitioned_parquet_with_key", + "partitioned_parquet_with_complextypes", + "partitioned_parquet_with_key_and_complextypes").foreach { table => test(s"ordering of the partitioning columns $table") { checkAnswer( sql(s"SELECT p, stringField FROM $table WHERE p = 1"), @@ -356,19 +389,26 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { } } - Seq("partitioned_parquet_with_key_and_complextypes", "partitioned_parquet_with_complextypes").foreach { table => - test(s"SPARK-5775 read struct from $table") { + Seq( + "partitioned_parquet_with_key_and_complextypes", + "partitioned_parquet_with_complextypes").foreach { table => + test(s"SPARK-5775 read structure from $table") { checkAnswer( - sql(s"SELECT p, structField.intStructField , structField.stringStructField FROM $table WHERE p = 1"), - (1 to 10).map { i => ((1, i, f"${i}_string"))} - ) + sql(s""" + SELECT + p, + structField.intStructField, + structField.stringStructField + FROM $table + WHERE p = 1"""), + (1 to 10).map(i => Row(1, i, f"${i}_string"))) } - test (s"SPARK-5775 read array from $table") { - checkAnswer( - sql(s"SELECT arrayField, p FROM $table WHERE p = 1"), - (1 to 10).map { i => ((1 to i,1))} - ) + // Re-enable this after SPARK-5508 is fixed + ignore(s"SPARK-5775 read array from $table") { + checkAnswer( + sql(s"SELECT arrayField, p FROM $table WHERE p = 1"), + (1 to 10).map(i => Row(1 to i, 1))) } } From 6a4c53d9491d182cc90c3160c7418b58f3b3062a Mon Sep 17 00:00:00 2001 From: Anselme Vignon Date: Mon, 23 Mar 2015 09:48:39 +0100 Subject: [PATCH 11/11] style corrections --- .../org/apache/spark/sql/parquet/parquetSuites.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 854dede799aff..d788b70481a87 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -30,8 +30,8 @@ import org.apache.spark.sql.hive.test.TestHive._ // The data where the partitioning key exists only in the directory structure. case class ParquetData(intField: Int, stringField: String) // The data that also includes the partitioning key -case class ParquetDataWithKey(p:Int, intField: Int, stringField: String) -case class StructContainer(intStructField :Int, stringStructField: String) +case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) +case class StructContainer(intStructField: Int, stringStructField: String) case class ParquetDataWithComplexTypes( intField: Int, @@ -87,7 +87,7 @@ class ParquetMetastoreSuite extends ParquetTest { ( intField INT, stringField STRING, - structField STRUCT, + structField STRUCT, arrayField ARRAY ) PARTITIONED BY (p int) @@ -103,7 +103,7 @@ class ParquetMetastoreSuite extends ParquetTest { ( intField INT, stringField STRING, - structField STRUCT, + structField STRUCT, arrayField ARRAY ) PARTITIONED BY (p int)