Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ private[sql] class MyDenseVector(val data: Array[Double]) extends Serializable {
java.util.Arrays.equals(this.data, v.data)
case _ => false
}

override def toString: String = {
data.mkString("[", ",", "]")
}
}

@BeanInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -100,80 +101,83 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
}

test("test all data types") {
withTempPath { file =>
// Create the schema.
val struct =
StructType(
StructField("f1", FloatType, true) ::
StructField("f2", ArrayType(BooleanType), true) :: Nil)
// TODO: add CalendarIntervalType to here once we can save it out.
val dataTypes =
Seq(
StringType, BinaryType, NullType, BooleanType,
ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
DateType, TimestampType,
ArrayType(IntegerType), MapType(StringType, LongType), struct,
new MyDenseVectorUDT())
val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
StructField(s"col$index", dataType, nullable = true)
}
val schema = StructType(fields)

// Generate data at the driver side. We need to materialize the data first and then
// create RDD.
val maybeDataGenerator =
RandomDataGenerator.forType(
dataType = schema,
nullable = true,
seed = Some(System.nanoTime()))
val dataGenerator =
maybeDataGenerator
.getOrElse(fail(s"Failed to create data generator for schema $schema"))
val data = (1 to 10).map { i =>
dataGenerator.apply() match {
case row: Row => row
case null => Row.fromSeq(Seq.fill(schema.length)(null))
case other =>
fail(s"Row or null is expected to be generated, " +
s"but a ${other.getClass.getCanonicalName} is generated.")
// TODO: remove it before we merge this PR.
(1 to 100).foreach { i =>
test(s"test all data types $i") {
withTempPath { file =>
// Create the schema.
val struct =
StructType(
StructField("f1", FloatType, true) ::
StructField("f2", ArrayType(BooleanType), true) :: Nil)
// TODO: add CalendarIntervalType to here once we can save it out.
val dataTypes =
Seq(
StringType, BinaryType, NullType, BooleanType,
ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
DateType, TimestampType,
ArrayType(IntegerType), MapType(StringType, LongType), struct,
new MyDenseVectorUDT())
val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
StructField(s"col$index", dataType, nullable = true)
}
val schema = StructType(fields)

// Generate data at the driver side. We need to materialize the data first and then
// create RDD.
val maybeDataGenerator =
RandomDataGenerator.forType(
dataType = schema,
nullable = true,
seed = Some(System.nanoTime()))
val dataGenerator =
maybeDataGenerator
.getOrElse(fail(s"Failed to create data generator for schema $schema"))
val data = (1 to 100).map { i =>
dataGenerator.apply() match {
case row: Row => row
case null => Row.fromSeq(Seq.fill(schema.length)(null))
case other =>
fail(s"Row or null is expected to be generated, " +
s"but a ${other.getClass.getCanonicalName} is generated.")
}
}
}

// Create a DF for the schema with random data.
val rdd = sqlContext.sparkContext.parallelize(data, 10)
val df = sqlContext.createDataFrame(rdd, schema)

// All columns that have supported data types of this source.
val supportedColumns = schema.fields.collect {
case StructField(name, dataType, _, _) if supportsDataType(dataType) => name
}
val selectedColumns = util.Random.shuffle(supportedColumns.toSeq)
// Create a DF for the schema with random data.
val rdd = sqlContext.sparkContext.parallelize(data, 10)
val df = sqlContext.createDataFrame(rdd, schema)

val dfToBeSaved = df.selectExpr(selectedColumns: _*)
// All columns that have supported data types of this source.
val supportedColumns = schema.fields.collect {
case StructField(name, dataType, _, _) if supportsDataType(dataType) => name
}
val selectedColumns = util.Random.shuffle(supportedColumns.toSeq).map(col)

// Save the data out.
dfToBeSaved
.write
.format(dataSourceName)
.option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
.save(file.getCanonicalPath)
val dfToBeSaved = df.select(selectedColumns: _*)

val loadedDF =
sqlContext
.read
// Save the data out.
dfToBeSaved
.write
.format(dataSourceName)
.schema(dfToBeSaved.schema)
.option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
.load(file.getCanonicalPath)
.selectExpr(selectedColumns: _*)
.save(file.getCanonicalPath)

// Read the data back.
checkAnswer(
loadedDF,
dfToBeSaved
)
val loadedDF =
sqlContext
.read
.format(dataSourceName)
.schema(dfToBeSaved.schema)
.option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
.load(file.getCanonicalPath)
.select(selectedColumns: _*)

// Read the data back
checkAnswer(
loadedDF,
dfToBeSaved
)
}
}
}

Expand Down