From f5b1b2b72099fe8dbfdbe5a165f2b6760a311c87 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 10 Sep 2015 11:16:44 -0700 Subject: [PATCH 1/2] Explicitly order data to make UDT values are properly sorted. --- .../sql/sources/hadoopFsRelationSuites.scala | 137 +++++++++--------- 1 file changed, 71 insertions(+), 66 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 24f43cf7c15ca..aaa80e855c089 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -28,6 +28,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ @@ -100,80 +101,84 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } } - test("test all data types") { - withTempPath { file => - // Create the schema. - val struct = - StructType( - StructField("f1", FloatType, true) :: - StructField("f2", ArrayType(BooleanType), true) :: Nil) - // TODO: add CalendarIntervalType to here once we can save it out. - val dataTypes = - Seq( - StringType, BinaryType, NullType, BooleanType, - ByteType, ShortType, IntegerType, LongType, - FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), - DateType, TimestampType, - ArrayType(IntegerType), MapType(StringType, LongType), struct, - new MyDenseVectorUDT()) - val fields = dataTypes.zipWithIndex.map { case (dataType, index) => - StructField(s"col$index", dataType, nullable = true) - } - val schema = StructType(fields) - - // Generate data at the driver side. We need to materialize the data first and then - // create RDD. - val maybeDataGenerator = - RandomDataGenerator.forType( - dataType = schema, - nullable = true, - seed = Some(System.nanoTime())) - val dataGenerator = - maybeDataGenerator - .getOrElse(fail(s"Failed to create data generator for schema $schema")) - val data = (1 to 10).map { i => - dataGenerator.apply() match { - case row: Row => row - case null => Row.fromSeq(Seq.fill(schema.length)(null)) - case other => - fail(s"Row or null is expected to be generated, " + - s"but a ${other.getClass.getCanonicalName} is generated.") + // TODO: remove it before we merge this PR. + (1 to 100).foreach { i => + test(s"test all data types $i") { + withTempPath { file => + // Create the schema. + val struct = + StructType( + StructField("f1", FloatType, true) :: + StructField("f2", ArrayType(BooleanType), true) :: Nil) + // TODO: add CalendarIntervalType to here once we can save it out. + val dataTypes = + Seq( + StringType, BinaryType, NullType, BooleanType, + ByteType, ShortType, IntegerType, LongType, + FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), + DateType, TimestampType, + ArrayType(IntegerType), MapType(StringType, LongType), struct, + new MyDenseVectorUDT()) + val fields = dataTypes.zipWithIndex.map { case (dataType, index) => + StructField(s"col$index", dataType, nullable = true) + } + val schema = StructType(fields) + + // Generate data at the driver side. We need to materialize the data first and then + // create RDD. + val maybeDataGenerator = + RandomDataGenerator.forType( + dataType = schema, + nullable = true, + seed = Some(System.nanoTime())) + val dataGenerator = + maybeDataGenerator + .getOrElse(fail(s"Failed to create data generator for schema $schema")) + val data = (1 to 100).map { i => + dataGenerator.apply() match { + case row: Row => row + case null => Row.fromSeq(Seq.fill(schema.length)(null)) + case other => + fail(s"Row or null is expected to be generated, " + + s"but a ${other.getClass.getCanonicalName} is generated.") + } } - } - - // Create a DF for the schema with random data. - val rdd = sqlContext.sparkContext.parallelize(data, 10) - val df = sqlContext.createDataFrame(rdd, schema) - // All columns that have supported data types of this source. - val supportedColumns = schema.fields.collect { - case StructField(name, dataType, _, _) if supportsDataType(dataType) => name - } - val selectedColumns = util.Random.shuffle(supportedColumns.toSeq) + // Create a DF for the schema with random data. + val rdd = sqlContext.sparkContext.parallelize(data, 10) + val df = sqlContext.createDataFrame(rdd, schema) - val dfToBeSaved = df.selectExpr(selectedColumns: _*) + // All columns that have supported data types of this source. + val supportedColumns = schema.fields.collect { + case StructField(name, dataType, _, _) if supportsDataType(dataType) => name + } + val selectedColumns = util.Random.shuffle(supportedColumns.toSeq).map(col) - // Save the data out. - dfToBeSaved - .write - .format(dataSourceName) - .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests. - .save(file.getCanonicalPath) + val dfToBeSaved = df.select(selectedColumns: _*) - val loadedDF = - sqlContext - .read + // Save the data out. + dfToBeSaved + .write .format(dataSourceName) - .schema(dfToBeSaved.schema) .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests. - .load(file.getCanonicalPath) - .selectExpr(selectedColumns: _*) + .save(file.getCanonicalPath) - // Read the data back. - checkAnswer( - loadedDF, - dfToBeSaved - ) + val loadedDF = + sqlContext + .read + .format(dataSourceName) + .schema(dfToBeSaved.schema) + .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests. + .load(file.getCanonicalPath) + .select(selectedColumns: _*) + + // Read the data back and manually order by data to make sure UDT values are properly + // sorted. + checkAnswer( + loadedDF.orderBy(selectedColumns: _*), + dfToBeSaved.orderBy(selectedColumns: _*) + ) + } } } From 8d85f057a02b59a27dba49d1cd98702b3a7211cd Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 10 Sep 2015 14:41:38 -0700 Subject: [PATCH 2/2] how about overriding toString of MyDenseVector? --- .../scala/org/apache/spark/sql/UserDefinedTypeSuite.scala | 4 ++++ .../apache/spark/sql/sources/hadoopFsRelationSuites.scala | 7 +++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala index fa8f9c8e00089..35c398d20cbfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala @@ -37,6 +37,10 @@ private[sql] class MyDenseVector(val data: Array[Double]) extends Serializable { java.util.Arrays.equals(this.data, v.data) case _ => false } + + override def toString: String = { + data.mkString("[", ",", "]") + } } @BeanInfo diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index aaa80e855c089..3f5394a7ff7a2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -172,11 +172,10 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .load(file.getCanonicalPath) .select(selectedColumns: _*) - // Read the data back and manually order by data to make sure UDT values are properly - // sorted. + // Read the data back checkAnswer( - loadedDF.orderBy(selectedColumns: _*), - dfToBeSaved.orderBy(selectedColumns: _*) + loadedDF, + dfToBeSaved ) } }