From 97e908072f30290396424552e6f1ef337f8a76e8 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Sun, 28 Dec 2014 23:59:37 -0800 Subject: [PATCH 1/3] parquet support for date type --- .../org/apache/spark/sql/parquet/ParquetConverter.scala | 8 +++++++- .../apache/spark/sql/parquet/ParquetTableSupport.scala | 1 + .../scala/org/apache/spark/sql/parquet/ParquetTypes.scala | 4 ++++ .../org/apache/spark/sql/parquet/ParquetSchemaSuite.scala | 3 ++- 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index f898e4b37a56b..4a30d49d49f79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.parquet -import java.sql.Timestamp +import java.sql.{Date, Timestamp} import java.util.{TimeZone, Calendar} import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap} @@ -192,6 +192,9 @@ private[parquet] abstract class CatalystConverter extends GroupConverter { protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = updateField(fieldIndex, value) + protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit = + updateField(fieldIndex, new Date(value)) + protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = updateField(fieldIndex, value) @@ -388,6 +391,9 @@ private[parquet] class CatalystPrimitiveRowConverter( override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = current.setInt(fieldIndex, value) + override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit = + current.update(fieldIndex, new Date(value)) + override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = current.setLong(fieldIndex, value) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 19bfba34b8f4a..c6b962261a5b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -212,6 +212,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { case DoubleType => writer.addDouble(value.asInstanceOf[Double]) case FloatType => writer.addFloat(value.asInstanceOf[Float]) case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean]) + case DateType => writer.addInteger(value.asInstanceOf[java.sql.Date].getTime.toInt) case d: DecimalType => if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) { sys.error(s"Unsupported datatype $d, cannot write to consumer") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index 5209581fa8357..da668f068613b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -64,6 +64,8 @@ private[parquet] object ParquetTypesConverter extends Logging { case ParquetPrimitiveTypeName.BOOLEAN => BooleanType case ParquetPrimitiveTypeName.DOUBLE => DoubleType case ParquetPrimitiveTypeName.FLOAT => FloatType + case ParquetPrimitiveTypeName.INT32 + if originalType == ParquetOriginalType.DATE => DateType case ParquetPrimitiveTypeName.INT32 => IntegerType case ParquetPrimitiveTypeName.INT64 => LongType case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType @@ -222,6 +224,8 @@ private[parquet] object ParquetTypesConverter extends Logging { // There is no type for Byte or Short so we promote them to INT32. case ShortType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32)) case ByteType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32)) + case DateType => Some(ParquetTypeInfo( + ParquetPrimitiveTypeName.INT32, Some(ParquetOriginalType.DATE))) case LongType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT64)) case TimestampType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT96)) case DecimalType.Fixed(precision, scale) if precision <= 18 => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala index ad880e2bc3679..321832cd43211 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala @@ -57,7 +57,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { |} """.stripMargin) - testSchema[(Byte, Short, Int, Long)]( + testSchema[(Byte, Short, Int, Long, java.sql.Date)]( "logical integral types", """ |message root { @@ -65,6 +65,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { | required int32 _2 (INT_16); | required int32 _3 (INT_32); | required int64 _4 (INT_64); + | optional int32 _5 (DATE); |} """.stripMargin) From faef887ba8e6ca21bab148c41efd1a86af110da2 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Sun, 8 Feb 2015 22:11:12 -0800 Subject: [PATCH 2/3] parquet support for primitive date --- .../apache/spark/sql/parquet/ParquetConverter.scala | 12 +++++++++--- .../spark/sql/parquet/ParquetTableSupport.scala | 3 ++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index 4a30d49d49f79..43ca359b51735 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.parquet -import java.sql.{Date, Timestamp} +import java.sql.Timestamp import java.util.{TimeZone, Calendar} import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap} @@ -127,6 +127,12 @@ private[sql] object CatalystConverter { parent.updateByte(fieldIndex, value.asInstanceOf[ByteType.JvmType]) } } + case DateType => { + new CatalystPrimitiveConverter(parent, fieldIndex) { + override def addInt(value: Int): Unit = + parent.updateDate(fieldIndex, value.asInstanceOf[DateType.JvmType]) + } + } case d: DecimalType => { new CatalystPrimitiveConverter(parent, fieldIndex) { override def addBinary(value: Binary): Unit = @@ -193,7 +199,7 @@ private[parquet] abstract class CatalystConverter extends GroupConverter { updateField(fieldIndex, value) protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit = - updateField(fieldIndex, new Date(value)) + updateField(fieldIndex, value) protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = updateField(fieldIndex, value) @@ -392,7 +398,7 @@ private[parquet] class CatalystPrimitiveRowConverter( current.setInt(fieldIndex, value) override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit = - current.update(fieldIndex, new Date(value)) + current.update(fieldIndex, value) override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = current.setLong(fieldIndex, value) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index c6b962261a5b4..5a1b15490d273 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -212,7 +212,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { case DoubleType => writer.addDouble(value.asInstanceOf[Double]) case FloatType => writer.addFloat(value.asInstanceOf[Float]) case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean]) - case DateType => writer.addInteger(value.asInstanceOf[java.sql.Date].getTime.toInt) + case DateType => writer.addInteger(value.asInstanceOf[Int]) case d: DecimalType => if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) { sys.error(s"Unsupported datatype $d, cannot write to consumer") @@ -359,6 +359,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport { case DoubleType => writer.addDouble(record.getDouble(index)) case FloatType => writer.addFloat(record.getFloat(index)) case BooleanType => writer.addBoolean(record.getBoolean(index)) + case DateType => writer.addInteger(record.getInt(index)) case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp]) case d: DecimalType => if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) { From 2c5d54d5ab7d9c178124cc551e9073267151c105 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Tue, 17 Mar 2015 23:13:10 -0700 Subject: [PATCH 3/3] add a test case --- .../apache/spark/sql/parquet/ParquetIOSuite.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index 5438095addeaf..203bc79f153dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -135,6 +135,21 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { } } + test("date type") { + def makeDateRDD(): DataFrame = + sparkContext + .parallelize(0 to 1000) + .map(i => Tuple1(DateUtils.toJavaDate(i))) + .toDF() + .select($"_1") + + withTempPath { dir => + val data = makeDateRDD() + data.saveAsParquetFile(dir.getCanonicalPath) + checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq) + } + } + test("map") { val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i"))) checkParquetFile(data)