From 315c7e86d53393c7e6935a6576ad89b952e6dbbc Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Tue, 8 May 2018 10:55:26 +0900 Subject: [PATCH 01/12] Fix --- .../datasources/json/JsonFileFormat.scala | 4 ++ .../datasources/json/JsonUtils.scala | 30 +++++++++ .../datasources/orc/OrcFileFormat.scala | 4 ++ .../execution/datasources/orc/OrcUtils.scala | 26 ++++++++ .../parquet/ParquetFileFormat.scala | 3 + .../datasources/parquet/ParquetUtils.scala | 50 +++++++++++++++ .../datasources/json/JsonSuite.scala | 50 +++++++++++++++ .../datasources/orc/OrcSourceSuite.scala | 64 ++++++++++++++++++- .../parquet/ParquetQuerySuite.scala | 59 ++++++++++++++++- .../spark/sql/hive/orc/OrcFileFormat.scala | 7 +- 10 files changed, 294 insertions(+), 3 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index e9a0b383b5f49..dac5004223ed8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -65,6 +65,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { + JsonUtils.verifySchema(dataSchema) + val conf = job.getConfiguration val parsedOptions = new JSONOptions( options, @@ -96,6 +98,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + JsonUtils.verifySchema(dataSchema) + val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala index d511594c5de1c..7cb6445c66c71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala @@ -21,6 +21,7 @@ import org.apache.spark.input.PortableDataStream import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.json.JSONOptions +import org.apache.spark.sql.types._ object JsonUtils { /** @@ -48,4 +49,33 @@ object JsonUtils { json.sample(withReplacement = false, options.samplingRatio, 1) } } + + /** + * Verify if the schema is supported in JSON datasource. + */ + def verifySchema(schema: StructType): Unit = { + def verifyType(dataType: DataType): Unit = dataType match { + case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | + StringType | BinaryType | DateType | TimestampType | _: DecimalType => + + case st: StructType => st.foreach { f => verifyType(f.dataType) } + + case ArrayType(elementType, _) => verifyType(elementType) + + case MapType(keyType, valueType, _) => + verifyType(keyType) + verifyType(valueType) + + case udt: UserDefinedType[_] => verifyType(udt.sqlType) + + // For backward-compatibility + case NullType => + + case _ => + throw new UnsupportedOperationException( + s"JSON data source does not support ${dataType.simpleString} data type.") + } + + schema.foreach(field => verifyType(field.dataType)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 1de2ca2914c44..f4f772166304d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -89,6 +89,8 @@ class OrcFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { + OrcUtils.verifySchema(dataSchema) + val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) val conf = job.getConfiguration @@ -141,6 +143,8 @@ class OrcFileFormat filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { + OrcUtils.verifySchema(dataSchema) + if (sparkSession.sessionState.conf.orcFilterPushDown) { OrcFilters.createFilter(dataSchema, filters).foreach { f => OrcInputFormat.setSearchArgument(hadoopConf, f, dataSchema.fieldNames) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 460194ba61c8b..d407cd11f85a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -120,4 +120,30 @@ object OrcUtils extends Logging { } } } + + /** + * Verify if the schema is supported in ORC datasource. + */ + def verifySchema(schema: StructType): Unit = { + def verifyType(dataType: DataType): Unit = dataType match { + case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | + StringType | BinaryType | DateType | TimestampType | _: DecimalType => + + case st: StructType => st.foreach { f => verifyType(f.dataType) } + + case ArrayType(elementType, _) => verifyType(elementType) + + case MapType(keyType, valueType, _) => + verifyType(keyType) + verifyType(valueType) + + case udt: UserDefinedType[_] => verifyType(udt.sqlType) + + case _ => + throw new UnsupportedOperationException( + s"ORC data source does not support ${dataType.simpleString} data type.") + } + + schema.foreach(field => verifyType(field.dataType)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 60fc9ec7e1f82..f547a032d96b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -78,6 +78,7 @@ class ParquetFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { + ParquetUtils.verifySchema(dataSchema) val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) @@ -302,6 +303,8 @@ class ParquetFileFormat filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { + ParquetUtils.verifySchema(dataSchema) + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) hadoopConf.set( ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala new file mode 100644 index 0000000000000..684e745fdc017 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.types._ + + +object ParquetUtils { + + /** + * Verify if the schema is supported in Parquet datasource. + */ + def verifySchema(schema: StructType): Unit = { + def verifyType(dataType: DataType): Unit = dataType match { + case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | + StringType | BinaryType | DateType | TimestampType | _: DecimalType => + + case st: StructType => st.foreach { f => verifyType(f.dataType) } + + case ArrayType(elementType, _) => verifyType(elementType) + + case MapType(keyType, valueType, _) => + verifyType(keyType) + verifyType(valueType) + + case udt: UserDefinedType[_] => verifyType(udt.sqlType) + + case _ => + throw new UnsupportedOperationException( + s"Parquet data source does not support ${dataType.simpleString} data type.") + } + + schema.foreach(field => verifyType(field.dataType)) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 897424daca0cb..1a2b3732a15ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.ExternalRDD import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.json.JsonInferSchema.compatibleType +import org.apache.spark.sql.execution.datasources.json.TestingUDT.{IntervalData, IntervalUDT} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -2494,4 +2495,53 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(exception.getMessage.contains("encoding must not be included in the blacklist")) } } + + test("SPARK-24204 error handling for unsupported data types") { + withTempDir { dir => + val jsonDir = new File(dir, "json").getCanonicalPath + + // write path + var msg = intercept[AnalysisException] { + sql("select interval 1 days").write.mode("overwrite").json(jsonDir) + }.getMessage + assert(msg.contains("Cannot save interval data type into external storage.")) + + msg = intercept[UnsupportedOperationException] { + spark.udf.register("testType", () => new IntervalData()) + sql("select testType()").write.mode("overwrite").json(jsonDir) + }.getMessage + assert(msg.contains("JSON data source does not support calendarinterval data type.")) + + // read path + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) + spark.range(1).write.mode("overwrite").json(jsonDir) + spark.read.schema(schema).json(jsonDir).collect() + }.getMessage + assert(msg.contains("JSON data source does not support calendarinterval data type.")) + + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil) + spark.range(1).write.mode("overwrite").json(jsonDir) + spark.read.schema(schema).json(jsonDir).collect() + }.getMessage + assert(msg.contains("JSON data source does not support calendarinterval data type.")) + } + } +} + +object TestingUDT { + + @SQLUserDefinedType(udt = classOf[IntervalUDT]) + private[sql] class IntervalData extends Serializable + + private[sql] class IntervalUDT extends UserDefinedType[IntervalData] { + + override def sqlType: DataType = CalendarIntervalType + override def serialize(obj: IntervalData): Any = + throw new NotImplementedError("Not implemented") + override def deserialize(datum: Any): IntervalData = + throw new NotImplementedError("Not implemented") + override def userClass: Class[IntervalData] = classOf[IntervalData] + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 02bfb7197ffc0..93063509b72a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -24,9 +24,11 @@ import java.util.Locale import org.apache.orc.OrcConf.COMPRESS import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.Row +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.execution.datasources.orc.TestingUDT.{NullData, NullUDT} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils case class OrcData(intField: Int, stringField: String) @@ -178,6 +180,51 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { checkAnswer(spark.read.orc(path.getCanonicalPath), Row(ts)) } } + + test("SPARK-24204 error handling for unsupported data types") { + withTempDir { dir => + val orcDir = new File(dir, "orc").getCanonicalPath + + // write path + var msg = intercept[AnalysisException] { + sql("select interval 1 days").write.mode("overwrite").orc(orcDir) + }.getMessage + assert(msg.contains("Cannot save interval data type into external storage.")) + + msg = intercept[UnsupportedOperationException] { + sql("select null").write.mode("overwrite").orc(orcDir) + }.getMessage + assert(msg.contains("ORC data source does not support null data type.")) + + msg = intercept[UnsupportedOperationException] { + spark.udf.register("testType", () => new NullData()) + sql("select testType()").write.mode("overwrite").orc(orcDir) + }.getMessage + assert(msg.contains("ORC data source does not support null data type.")) + + // read path + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) + spark.range(1).write.mode("overwrite").orc(orcDir) + spark.read.schema(schema).orc(orcDir).collect() + }.getMessage + assert(msg.contains("ORC data source does not support calendarinterval data type.")) + + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", NullType, true) :: Nil) + spark.range(1).write.mode("overwrite").orc(orcDir) + spark.read.schema(schema).orc(orcDir).collect() + }.getMessage + assert(msg.contains("ORC data source does not support null data type.")) + + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", new NullUDT(), true) :: Nil) + spark.range(1).write.mode("overwrite").orc(orcDir) + spark.read.schema(schema).orc(orcDir).collect() + }.getMessage + assert(msg.contains("ORC data source does not support null data type.")) + } + } } class OrcSourceSuite extends OrcSuite with SharedSQLContext { @@ -216,3 +263,18 @@ class OrcSourceSuite extends OrcSuite with SharedSQLContext { """.stripMargin) } } + +object TestingUDT { + + @SQLUserDefinedType(udt = classOf[NullUDT]) + private[sql] class NullData extends Serializable + + private[sql] class NullUDT extends UserDefinedType[NullData] { + + override def sqlType: DataType = NullType + override def serialize(obj: NullData): Any = throw new NotImplementedError("Not implemented") + override def deserialize(datum: Any): NullData = + throw new NotImplementedError("Not implemented") + override def userClass: Class[NullData] = classOf[NullData] + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index dbf637783e6d2..4c339bd9c9bea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol -import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement} +import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, NullData, NullUDT, SingleElement} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -891,6 +891,51 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-24204 error handling for unsupported data types") { + withTempDir { dir => + val parquetDir = new File(dir, "parquet").getCanonicalPath + + // write path + var msg = intercept[AnalysisException] { + sql("select interval 1 days").write.mode("overwrite").parquet(parquetDir) + }.getMessage + assert(msg.contains("Cannot save interval data type into external storage.")) + + msg = intercept[UnsupportedOperationException] { + sql("select null").write.mode("overwrite").parquet(parquetDir) + }.getMessage + assert(msg.contains("Parquet data source does not support null data type.")) + + msg = intercept[UnsupportedOperationException] { + spark.udf.register("testType", () => new NullData()) + sql("select testType()").write.mode("overwrite").parquet(parquetDir) + }.getMessage + assert(msg.contains("Parquet data source does not support null data type.")) + + // read path + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) + spark.range(1).write.mode("overwrite").parquet(parquetDir) + spark.read.schema(schema).parquet(parquetDir).collect() + }.getMessage + assert(msg.contains("Parquet data source does not support calendarinterval data type.")) + + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", NullType, true) :: Nil) + spark.range(1).write.mode("overwrite").parquet(parquetDir) + spark.read.schema(schema).parquet(parquetDir).collect() + }.getMessage + assert(msg.contains("Parquet data source does not support null data type.")) + + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", new NullUDT(), true) :: Nil) + spark.range(1).write.mode("overwrite").parquet(parquetDir) + spark.read.schema(schema).parquet(parquetDir).collect() + }.getMessage + assert(msg.contains("Parquet data source does not support null data type.")) + } + } } object TestingUDT { @@ -922,4 +967,16 @@ object TestingUDT { } } } + + @SQLUserDefinedType(udt = classOf[NullUDT]) + private[sql] class NullData extends Serializable + + private[sql] class NullUDT extends UserDefinedType[NullData] { + + override def sqlType: DataType = NullType + override def serialize(obj: NullData): Any = throw new NotImplementedError("Not implemented") + override def deserialize(datum: Any): NullData = + throw new NotImplementedError("Not implemented") + override def userClass: Class[NullData] = classOf[NullData] + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 237ed9bc05988..2719eb4ec55af 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -39,12 +39,13 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.orc.OrcOptions +import org.apache.spark.sql.execution.datasources.orc.{OrcOptions, OrcUtils} import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} import org.apache.spark.sql.sources.{Filter, _} import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration + /** * `FileFormat` for reading ORC files. If this is moved or renamed, please update * `DataSource`'s backwardCompatibilityMap. @@ -72,6 +73,8 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { + OrcUtils.verifySchema(dataSchema) + val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) val configuration = job.getConfiguration @@ -121,6 +124,8 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { + OrcUtils.verifySchema(dataSchema) + if (sparkSession.sessionState.conf.orcFilterPushDown) { // Sets pushed predicates OrcFilters.createFilter(requiredSchema, filters.toArray).foreach { f => From 731c8737b34588d08f930101f50852299cf9482c Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Wed, 23 May 2018 11:01:53 +0900 Subject: [PATCH 02/12] Fix --- .../spark/sql/FileBasedDataSourceSuite.scala | 76 ++++++++++++++++++- .../datasources/json/JsonSuite.scala | 1 - .../datasources/orc/OrcSourceSuite.scala | 19 +---- .../parquet/ParquetQuerySuite.scala | 14 +--- .../sql/hive/orc/HiveOrcSourceSuite.scala | 49 +++++++++++- 5 files changed, 125 insertions(+), 34 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 06303099f5310..786bf457ed869 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -17,12 +17,14 @@ package org.apache.spark.sql -import java.io.FileNotFoundException +import java.io.{File, FileNotFoundException} +import java.util.Locale import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException +import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -202,4 +204,76 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } } + + test("SPARK-24204 error handling for unsupported data types") { + withTempDir { dir => + val tempDir = new File(dir, "files").getCanonicalPath + + Seq("parquet", "orc", "json").foreach { format => + // write path + var msg = intercept[AnalysisException] { + sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir) + }.getMessage + assert(msg.contains("Cannot save interval data type into external storage.")) + + msg = intercept[UnsupportedOperationException] { + spark.udf.register("testType", () => new IntervalData()) + sql("select testType()").write.format(format).mode("overwrite").save(tempDir) + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(s"$format data source does not support calendarinterval data type.")) + + // read path + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) + spark.range(1).write.format(format).mode("overwrite").save(tempDir) + spark.read.schema(schema).format(format).load(tempDir).collect() + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(s"$format data source does not support calendarinterval data type.")) + + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil) + spark.range(1).write.format(format).mode("overwrite").save(tempDir) + spark.read.schema(schema).format(format).load(tempDir).collect() + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(s"$format data source does not support calendarinterval data type.")) + } + + Seq("parquet", "orc").foreach { format => + // write path + var msg = intercept[UnsupportedOperationException] { + sql("select null").write.format(format).mode("overwrite").save(tempDir) + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(s"$format data source does not support null data type.")) + + // read path + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", NullType, true) :: Nil) + spark.range(1).write.format(format).mode("overwrite").save(tempDir) + spark.read.schema(schema).format(format).load(tempDir).collect() + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(s"$format data source does not support null data type.")) + } + } + } +} + +object TestingUDT { + + @SQLUserDefinedType(udt = classOf[IntervalUDT]) + class IntervalData extends Serializable + + class IntervalUDT extends UserDefinedType[IntervalData] { + + override def sqlType: DataType = CalendarIntervalType + override def serialize(obj: IntervalData): Any = + throw new NotImplementedError("Not implemented") + override def deserialize(datum: Any): IntervalData = + throw new NotImplementedError("Not implemented") + override def userClass: Class[IntervalData] = classOf[IntervalData] + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 1a2b3732a15ff..db57abec36c59 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.ExternalRDD import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.json.JsonInferSchema.compatibleType -import org.apache.spark.sql.execution.datasources.json.TestingUDT.{IntervalData, IntervalUDT} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 93063509b72a2..c0cef5e7137d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -24,11 +24,9 @@ import java.util.Locale import org.apache.orc.OrcConf.COMPRESS import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.{AnalysisException, Row} -import org.apache.spark.sql.execution.datasources.orc.TestingUDT.{NullData, NullUDT} +import org.apache.spark.sql.Row import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types._ import org.apache.spark.util.Utils case class OrcData(intField: Int, stringField: String) @@ -263,18 +261,3 @@ class OrcSourceSuite extends OrcSuite with SharedSQLContext { """.stripMargin) } } - -object TestingUDT { - - @SQLUserDefinedType(udt = classOf[NullUDT]) - private[sql] class NullData extends Serializable - - private[sql] class NullUDT extends UserDefinedType[NullData] { - - override def sqlType: DataType = NullType - override def serialize(obj: NullData): Any = throw new NotImplementedError("Not implemented") - override def deserialize(datum: Any): NullData = - throw new NotImplementedError("Not implemented") - override def userClass: Class[NullData] = classOf[NullData] - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 4c339bd9c9bea..dd8dba34e502d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol -import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, NullData, NullUDT, SingleElement} +import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -967,16 +967,4 @@ object TestingUDT { } } } - - @SQLUserDefinedType(udt = classOf[NullUDT]) - private[sql] class NullData extends Serializable - - private[sql] class NullUDT extends UserDefinedType[NullData] { - - override def sqlType: DataType = NullType - override def serialize(obj: NullData): Any = throw new NotImplementedError("Not implemented") - override def deserialize(datum: Any): NullData = - throw new NotImplementedError("Not implemented") - override def userClass: Class[NullData] = classOf[NullData] - } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala index d556a030e2186..69009e1b520c2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala @@ -19,11 +19,13 @@ package org.apache.spark.sql.hive.orc import java.io.File -import org.apache.spark.sql.Row +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.datasources.orc.OrcSuite import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.HiveSerDe +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { @@ -133,4 +135,49 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { Utils.deleteRecursively(location) } } + + test("SPARK-24204 error handling for unsupported data types") { + withTempDir { dir => + val orcDir = new File(dir, "orc").getCanonicalPath + + // write path + var msg = intercept[AnalysisException] { + sql("select interval 1 days").write.mode("overwrite").orc(orcDir) + }.getMessage + assert(msg.contains("Cannot save interval data type into external storage.")) + + msg = intercept[UnsupportedOperationException] { + sql("select null").write.mode("overwrite").orc(orcDir) + }.getMessage + assert(msg.contains("ORC data source does not support null data type.")) + + msg = intercept[UnsupportedOperationException] { + spark.udf.register("testType", () => new IntervalData()) + sql("select testType()").write.mode("overwrite").orc(orcDir) + }.getMessage + assert(msg.contains("ORC data source does not support calendarinterval data type.")) + + // read path + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) + spark.range(1).write.mode("overwrite").orc(orcDir) + spark.read.schema(schema).orc(orcDir).collect() + }.getMessage + assert(msg.contains("ORC data source does not support calendarinterval data type.")) + + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", NullType, true) :: Nil) + spark.range(1).write.mode("overwrite").orc(orcDir) + spark.read.schema(schema).orc(orcDir).collect() + }.getMessage + assert(msg.contains("ORC data source does not support null data type.")) + + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil) + spark.range(1).write.mode("overwrite").orc(orcDir) + spark.read.schema(schema).orc(orcDir).collect() + }.getMessage + assert(msg.contains("ORC data source does not support calendarinterval data type.")) + } + } } From cf4cf2f147847648ad45b3681839202ef1b7e475 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Wed, 23 May 2018 11:23:44 +0900 Subject: [PATCH 03/12] Fix --- ...rquetUtils.scala => DataSourceUtils.scala} | 13 +++--- .../datasources/json/JsonFileFormat.scala | 4 +- .../datasources/json/JsonUtils.scala | 30 ------------- .../datasources/orc/OrcFileFormat.scala | 4 +- .../execution/datasources/orc/OrcUtils.scala | 26 ----------- .../parquet/ParquetFileFormat.scala | 4 +- .../datasources/orc/OrcSourceSuite.scala | 45 ------------------- .../parquet/ParquetQuerySuite.scala | 45 ------------------- .../spark/sql/hive/orc/OrcFileFormat.scala | 7 ++- 9 files changed, 17 insertions(+), 161 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/{parquet/ParquetUtils.scala => DataSourceUtils.scala} (82%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala similarity index 82% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 684e745fdc017..6091496069e1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -15,17 +15,17 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources.parquet +package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.types._ -object ParquetUtils { +object DataSourceUtils { /** - * Verify if the schema is supported in Parquet datasource. + * Verify if the schema is supported in datasource. */ - def verifySchema(schema: StructType): Unit = { + def verifySchema(format: String, schema: StructType): Unit = { def verifyType(dataType: DataType): Unit = dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | StringType | BinaryType | DateType | TimestampType | _: DecimalType => @@ -40,9 +40,12 @@ object ParquetUtils { case udt: UserDefinedType[_] => verifyType(udt.sqlType) + // For backward-compatibility + case NullType if format == "JSON" => + case _ => throw new UnsupportedOperationException( - s"Parquet data source does not support ${dataType.simpleString} data type.") + s"$format data source does not support ${dataType.simpleString} data type.") } schema.foreach(field => verifyType(field.dataType)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index dac5004223ed8..21b6d4037e62d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -65,7 +65,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - JsonUtils.verifySchema(dataSchema) + DataSourceUtils.verifySchema("JSON", dataSchema) val conf = job.getConfiguration val parsedOptions = new JSONOptions( @@ -98,7 +98,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { - JsonUtils.verifySchema(dataSchema) + DataSourceUtils.verifySchema("JSON", dataSchema) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala index 7cb6445c66c71..d511594c5de1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala @@ -21,7 +21,6 @@ import org.apache.spark.input.PortableDataStream import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.json.JSONOptions -import org.apache.spark.sql.types._ object JsonUtils { /** @@ -49,33 +48,4 @@ object JsonUtils { json.sample(withReplacement = false, options.samplingRatio, 1) } } - - /** - * Verify if the schema is supported in JSON datasource. - */ - def verifySchema(schema: StructType): Unit = { - def verifyType(dataType: DataType): Unit = dataType match { - case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | - StringType | BinaryType | DateType | TimestampType | _: DecimalType => - - case st: StructType => st.foreach { f => verifyType(f.dataType) } - - case ArrayType(elementType, _) => verifyType(elementType) - - case MapType(keyType, valueType, _) => - verifyType(keyType) - verifyType(valueType) - - case udt: UserDefinedType[_] => verifyType(udt.sqlType) - - // For backward-compatibility - case NullType => - - case _ => - throw new UnsupportedOperationException( - s"JSON data source does not support ${dataType.simpleString} data type.") - } - - schema.foreach(field => verifyType(field.dataType)) - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index f4f772166304d..9d3611272ff16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -89,7 +89,7 @@ class OrcFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - OrcUtils.verifySchema(dataSchema) + DataSourceUtils.verifySchema("ORC", dataSchema) val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) @@ -143,7 +143,7 @@ class OrcFileFormat filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - OrcUtils.verifySchema(dataSchema) + DataSourceUtils.verifySchema("ORC", dataSchema) if (sparkSession.sessionState.conf.orcFilterPushDown) { OrcFilters.createFilter(dataSchema, filters).foreach { f => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index d407cd11f85a4..460194ba61c8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -120,30 +120,4 @@ object OrcUtils extends Logging { } } } - - /** - * Verify if the schema is supported in ORC datasource. - */ - def verifySchema(schema: StructType): Unit = { - def verifyType(dataType: DataType): Unit = dataType match { - case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | - StringType | BinaryType | DateType | TimestampType | _: DecimalType => - - case st: StructType => st.foreach { f => verifyType(f.dataType) } - - case ArrayType(elementType, _) => verifyType(elementType) - - case MapType(keyType, valueType, _) => - verifyType(keyType) - verifyType(valueType) - - case udt: UserDefinedType[_] => verifyType(udt.sqlType) - - case _ => - throw new UnsupportedOperationException( - s"ORC data source does not support ${dataType.simpleString} data type.") - } - - schema.foreach(field => verifyType(field.dataType)) - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index f547a032d96b7..95e6b5f12dbaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -78,7 +78,7 @@ class ParquetFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - ParquetUtils.verifySchema(dataSchema) + DataSourceUtils.verifySchema("Parquet", dataSchema) val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) @@ -303,7 +303,7 @@ class ParquetFileFormat filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - ParquetUtils.verifySchema(dataSchema) + DataSourceUtils.verifySchema("Parquet", dataSchema) hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) hadoopConf.set( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index c0cef5e7137d6..02bfb7197ffc0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -178,51 +178,6 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { checkAnswer(spark.read.orc(path.getCanonicalPath), Row(ts)) } } - - test("SPARK-24204 error handling for unsupported data types") { - withTempDir { dir => - val orcDir = new File(dir, "orc").getCanonicalPath - - // write path - var msg = intercept[AnalysisException] { - sql("select interval 1 days").write.mode("overwrite").orc(orcDir) - }.getMessage - assert(msg.contains("Cannot save interval data type into external storage.")) - - msg = intercept[UnsupportedOperationException] { - sql("select null").write.mode("overwrite").orc(orcDir) - }.getMessage - assert(msg.contains("ORC data source does not support null data type.")) - - msg = intercept[UnsupportedOperationException] { - spark.udf.register("testType", () => new NullData()) - sql("select testType()").write.mode("overwrite").orc(orcDir) - }.getMessage - assert(msg.contains("ORC data source does not support null data type.")) - - // read path - msg = intercept[UnsupportedOperationException] { - val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) - spark.range(1).write.mode("overwrite").orc(orcDir) - spark.read.schema(schema).orc(orcDir).collect() - }.getMessage - assert(msg.contains("ORC data source does not support calendarinterval data type.")) - - msg = intercept[UnsupportedOperationException] { - val schema = StructType(StructField("a", NullType, true) :: Nil) - spark.range(1).write.mode("overwrite").orc(orcDir) - spark.read.schema(schema).orc(orcDir).collect() - }.getMessage - assert(msg.contains("ORC data source does not support null data type.")) - - msg = intercept[UnsupportedOperationException] { - val schema = StructType(StructField("a", new NullUDT(), true) :: Nil) - spark.range(1).write.mode("overwrite").orc(orcDir) - spark.read.schema(schema).orc(orcDir).collect() - }.getMessage - assert(msg.contains("ORC data source does not support null data type.")) - } - } } class OrcSourceSuite extends OrcSuite with SharedSQLContext { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index dd8dba34e502d..dbf637783e6d2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -891,51 +891,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } - - test("SPARK-24204 error handling for unsupported data types") { - withTempDir { dir => - val parquetDir = new File(dir, "parquet").getCanonicalPath - - // write path - var msg = intercept[AnalysisException] { - sql("select interval 1 days").write.mode("overwrite").parquet(parquetDir) - }.getMessage - assert(msg.contains("Cannot save interval data type into external storage.")) - - msg = intercept[UnsupportedOperationException] { - sql("select null").write.mode("overwrite").parquet(parquetDir) - }.getMessage - assert(msg.contains("Parquet data source does not support null data type.")) - - msg = intercept[UnsupportedOperationException] { - spark.udf.register("testType", () => new NullData()) - sql("select testType()").write.mode("overwrite").parquet(parquetDir) - }.getMessage - assert(msg.contains("Parquet data source does not support null data type.")) - - // read path - msg = intercept[UnsupportedOperationException] { - val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) - spark.range(1).write.mode("overwrite").parquet(parquetDir) - spark.read.schema(schema).parquet(parquetDir).collect() - }.getMessage - assert(msg.contains("Parquet data source does not support calendarinterval data type.")) - - msg = intercept[UnsupportedOperationException] { - val schema = StructType(StructField("a", NullType, true) :: Nil) - spark.range(1).write.mode("overwrite").parquet(parquetDir) - spark.read.schema(schema).parquet(parquetDir).collect() - }.getMessage - assert(msg.contains("Parquet data source does not support null data type.")) - - msg = intercept[UnsupportedOperationException] { - val schema = StructType(StructField("a", new NullUDT(), true) :: Nil) - spark.range(1).write.mode("overwrite").parquet(parquetDir) - spark.read.schema(schema).parquet(parquetDir).collect() - }.getMessage - assert(msg.contains("Parquet data source does not support null data type.")) - } - } } object TestingUDT { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 2719eb4ec55af..1ad01cf984b6d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -39,13 +39,12 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.orc.{OrcOptions, OrcUtils} +import org.apache.spark.sql.execution.datasources.orc.OrcOptions import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} import org.apache.spark.sql.sources.{Filter, _} import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration - /** * `FileFormat` for reading ORC files. If this is moved or renamed, please update * `DataSource`'s backwardCompatibilityMap. @@ -73,7 +72,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - OrcUtils.verifySchema(dataSchema) + DataSourceUtils.verifySchema("ORC", dataSchema) val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) @@ -124,7 +123,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - OrcUtils.verifySchema(dataSchema) + DataSourceUtils.verifySchema("ORC", dataSchema) if (sparkSession.sessionState.conf.orcFilterPushDown) { // Sets pushed predicates From 3158e00b34bffec9440a3dc0e1470570450078db Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 28 May 2018 12:35:11 +0900 Subject: [PATCH 04/12] Fix --- .../datasources/DataSourceUtils.scala | 7 +- .../datasources/json/JsonFileFormat.scala | 4 +- .../datasources/orc/OrcFileFormat.scala | 4 +- .../parquet/ParquetFileFormat.scala | 4 +- .../spark/sql/FileBasedDataSourceSuite.scala | 69 ++++++++++++++++++- .../execution/datasources/csv/CSVSuite.scala | 33 --------- .../spark/sql/hive/orc/OrcFileFormat.scala | 4 +- 7 files changed, 78 insertions(+), 47 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 6091496069e1f..5149e480bdb88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.types._ @@ -25,7 +26,7 @@ object DataSourceUtils { /** * Verify if the schema is supported in datasource. */ - def verifySchema(format: String, schema: StructType): Unit = { + def verifySchema(format: FileFormat, schema: StructType): Unit = { def verifyType(dataType: DataType): Unit = dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | StringType | BinaryType | DateType | TimestampType | _: DecimalType => @@ -40,8 +41,8 @@ object DataSourceUtils { case udt: UserDefinedType[_] => verifyType(udt.sqlType) - // For backward-compatibility - case NullType if format == "JSON" => + // For JSON backward-compatibility + case NullType if format.isInstanceOf[JsonFileFormat] => case _ => throw new UnsupportedOperationException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 21b6d4037e62d..36310a5939b57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -65,7 +65,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - DataSourceUtils.verifySchema("JSON", dataSchema) + DataSourceUtils.verifySchema(this, dataSchema) val conf = job.getConfiguration val parsedOptions = new JSONOptions( @@ -98,7 +98,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { - DataSourceUtils.verifySchema("JSON", dataSchema) + DataSourceUtils.verifySchema(this, dataSchema) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 9d3611272ff16..3f2982fa19d63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -89,7 +89,7 @@ class OrcFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - DataSourceUtils.verifySchema("ORC", dataSchema) + DataSourceUtils.verifySchema(this, dataSchema) val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) @@ -143,7 +143,7 @@ class OrcFileFormat filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - DataSourceUtils.verifySchema("ORC", dataSchema) + DataSourceUtils.verifySchema(this, dataSchema) if (sparkSession.sessionState.conf.orcFilterPushDown) { OrcFilters.createFilter(dataSchema, filters).foreach { f => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 95e6b5f12dbaa..a1a8d71fd4b64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -78,7 +78,7 @@ class ParquetFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - DataSourceUtils.verifySchema("Parquet", dataSchema) + DataSourceUtils.verifySchema(this, dataSchema) val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) @@ -303,7 +303,7 @@ class ParquetFileFormat filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - DataSourceUtils.verifySchema("Parquet", dataSchema) + DataSourceUtils.verifySchema(this, dataSchema) hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) hadoopConf.set( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 786bf457ed869..880c1460359e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException -import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT} +import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT, NullData, NullUDT} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -205,11 +205,47 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } + // Unsupported data types of csv, json, orc, and parquet are as follows; + // csv -> Interval, Null, Array, Map, Struct + // json -> Interval + // orc -> Interval, Null + // parquet -> Interval, Null test("SPARK-24204 error handling for unsupported data types") { + withTempDir { dir => + val csvDir = new File(dir, "csv").getCanonicalPath + var msg = intercept[UnsupportedOperationException] { + Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, b)").write.csv(csvDir) + }.getMessage + assert(msg.contains("CSV data source does not support struct data type")) + + msg = intercept[UnsupportedOperationException] { + Seq((1, Map("Tesla" -> 3))).toDF("id", "cars").write.csv(csvDir) + }.getMessage + assert(msg.contains("CSV data source does not support map data type")) + + msg = intercept[UnsupportedOperationException] { + Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands").write.csv(csvDir) + }.getMessage + assert(msg.contains("CSV data source does not support array data type")) + + msg = intercept[UnsupportedOperationException] { + Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25)))).toDF("id", "vectors") + .write.csv(csvDir) + }.getMessage + assert(msg.contains("CSV data source does not support array data type")) + + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), true) :: Nil) + spark.range(1).write.csv(csvDir) + spark.read.schema(schema).csv(csvDir).collect() + }.getMessage + assert(msg.contains("CSV data source does not support array data type.")) + } + withTempDir { dir => val tempDir = new File(dir, "files").getCanonicalPath - Seq("parquet", "orc", "json").foreach { format => + Seq("parquet", "orc", "json", "csv").foreach { format => // write path var msg = intercept[AnalysisException] { sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir) @@ -241,7 +277,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo .contains(s"$format data source does not support calendarinterval data type.")) } - Seq("parquet", "orc").foreach { format => + Seq("parquet", "orc", "csv").foreach { format => // write path var msg = intercept[UnsupportedOperationException] { sql("select null").write.format(format).mode("overwrite").save(tempDir) @@ -249,6 +285,13 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo assert(msg.toLowerCase(Locale.ROOT) .contains(s"$format data source does not support null data type.")) + msg = intercept[UnsupportedOperationException] { + spark.udf.register("testType", () => new NullData()) + sql("select testType()").write.format(format).mode("overwrite").save(tempDir) + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(s"$format data source does not support null data type.")) + // read path msg = intercept[UnsupportedOperationException] { val schema = StructType(StructField("a", NullType, true) :: Nil) @@ -257,6 +300,14 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo }.getMessage assert(msg.toLowerCase(Locale.ROOT) .contains(s"$format data source does not support null data type.")) + + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", new NullUDT(), true) :: Nil) + spark.range(1).write.format(format).mode("overwrite").save(tempDir) + spark.read.schema(schema).format(format).load(tempDir).collect() + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(s"$format data source does not support null data type.")) } } } @@ -276,4 +327,16 @@ object TestingUDT { throw new NotImplementedError("Not implemented") override def userClass: Class[IntervalData] = classOf[IntervalData] } + + @SQLUserDefinedType(udt = classOf[NullUDT]) + private[sql] class NullData extends Serializable + + private[sql] class NullUDT extends UserDefinedType[NullData] { + + override def sqlType: DataType = NullType + override def serialize(obj: NullData): Any = throw new NotImplementedError("Not implemented") + override def deserialize(datum: Any): NullData = + throw new NotImplementedError("Not implemented") + override def userClass: Class[NullData] = classOf[NullData] + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index d2f166c7d1877..365239d040ef2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -740,39 +740,6 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(numbers.count() == 8) } - test("error handling for unsupported data types.") { - withTempDir { dir => - val csvDir = new File(dir, "csv").getCanonicalPath - var msg = intercept[UnsupportedOperationException] { - Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, b)").write.csv(csvDir) - }.getMessage - assert(msg.contains("CSV data source does not support struct data type")) - - msg = intercept[UnsupportedOperationException] { - Seq((1, Map("Tesla" -> 3))).toDF("id", "cars").write.csv(csvDir) - }.getMessage - assert(msg.contains("CSV data source does not support map data type")) - - msg = intercept[UnsupportedOperationException] { - Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands").write.csv(csvDir) - }.getMessage - assert(msg.contains("CSV data source does not support array data type")) - - msg = intercept[UnsupportedOperationException] { - Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25)))).toDF("id", "vectors") - .write.csv(csvDir) - }.getMessage - assert(msg.contains("CSV data source does not support array data type")) - - msg = intercept[UnsupportedOperationException] { - val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), true) :: Nil) - spark.range(1).write.csv(csvDir) - spark.read.schema(schema).csv(csvDir).collect() - }.getMessage - assert(msg.contains("CSV data source does not support array data type.")) - } - } - test("SPARK-15585 turn off quotations") { val cars = spark.read .format("csv") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 1ad01cf984b6d..b36b2ec3ff273 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -72,7 +72,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - DataSourceUtils.verifySchema("ORC", dataSchema) + DataSourceUtils.verifySchema(this, dataSchema) val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) @@ -123,7 +123,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - DataSourceUtils.verifySchema("ORC", dataSchema) + DataSourceUtils.verifySchema(this, dataSchema) if (sparkSession.sessionState.conf.orcFilterPushDown) { // Sets pushed predicates From 0b25c4df04457d1317c4d4aeea29ee6047e53887 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 28 May 2018 12:46:53 +0900 Subject: [PATCH 05/12] Merge with CSV.verifySChema --- .../datasources/DataSourceUtils.scala | 13 ++++++++++--- .../datasources/csv/CSVFileFormat.scala | 4 ++-- .../execution/datasources/csv/CSVUtils.scala | 19 ------------------- 3 files changed, 12 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 5149e480bdb88..cbd0aae09c59c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.types._ @@ -27,10 +28,18 @@ object DataSourceUtils { * Verify if the schema is supported in datasource. */ def verifySchema(format: FileFormat, schema: StructType): Unit = { + def throwUnsupportedException(dataType: DataType): Unit = { + throw new UnsupportedOperationException( + s"$format data source does not support ${dataType.simpleString} data type.") + } + def verifyType(dataType: DataType): Unit = dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | StringType | BinaryType | DateType | TimestampType | _: DecimalType => + case _: StructType | _: ArrayType | _: MapType if format.isInstanceOf[CSVFileFormat] => + throwUnsupportedException(dataType) + case st: StructType => st.foreach { f => verifyType(f.dataType) } case ArrayType(elementType, _) => verifyType(elementType) @@ -44,9 +53,7 @@ object DataSourceUtils { // For JSON backward-compatibility case NullType if format.isInstanceOf[JsonFileFormat] => - case _ => - throw new UnsupportedOperationException( - s"$format data source does not support ${dataType.simpleString} data type.") + case _ => throwUnsupportedException(dataType) } schema.foreach(field => verifyType(field.dataType)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index b90275de9f40a..12b547e4e41a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -66,7 +66,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - CSVUtils.verifySchema(dataSchema) + DataSourceUtils.verifySchema(this, dataSchema) val conf = job.getConfiguration val csvOptions = new CSVOptions( options, @@ -98,7 +98,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - CSVUtils.verifySchema(dataSchema) + DataSourceUtils.verifySchema(this, dataSchema) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala index 1012e774118e2..7ce65fa89b02d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala @@ -117,25 +117,6 @@ object CSVUtils { } } - /** - * Verify if the schema is supported in CSV datasource. - */ - def verifySchema(schema: StructType): Unit = { - def verifyType(dataType: DataType): Unit = dataType match { - case ByteType | ShortType | IntegerType | LongType | FloatType | - DoubleType | BooleanType | _: DecimalType | TimestampType | - DateType | StringType => - - case udt: UserDefinedType[_] => verifyType(udt.sqlType) - - case _ => - throw new UnsupportedOperationException( - s"CSV data source does not support ${dataType.simpleString} data type.") - } - - schema.foreach(field => verifyType(field.dataType)) - } - /** * Sample CSV dataset as configured by `samplingRatio`. */ From 927497dbb519fa4419f704371aee3a046fdcee44 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 15 Jun 2018 16:12:36 +0900 Subject: [PATCH 06/12] Fix --- .../sql/execution/datasources/DataSourceUtils.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index cbd0aae09c59c..969fae7c1a1ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -25,7 +25,15 @@ import org.apache.spark.sql.types._ object DataSourceUtils { /** - * Verify if the schema is supported in datasource. + * Verify if the schema is supported in datasource. This verification should be done + * in a driver side, e.g., `prepareWrite`, `buildReader`, and `buildReaderWithPartitionValues` + * in `FileFormat`. + * + * Unsupported data types of csv, json, orc, and parquet are as follows; + * csv -> Interval, Null, Array, Map, Struct + * json -> Interval + * orc -> Interval, Null + * parquet -> Interval, Null */ def verifySchema(format: FileFormat, schema: StructType): Unit = { def throwUnsupportedException(dataType: DataType): Unit = { From 589479de6e38a60c24076f7b027d168782fa2abb Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sat, 16 Jun 2018 09:52:04 +0900 Subject: [PATCH 07/12] Add tests for CSV --- .../datasources/DataSourceUtils.scala | 8 ++-- .../spark/sql/FileBasedDataSourceSuite.scala | 38 +++++++++++++++---- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 969fae7c1a1ea..38184a422ae60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -30,10 +30,10 @@ object DataSourceUtils { * in `FileFormat`. * * Unsupported data types of csv, json, orc, and parquet are as follows; - * csv -> Interval, Null, Array, Map, Struct - * json -> Interval - * orc -> Interval, Null - * parquet -> Interval, Null + * csv -> R/W: Interval, Null, Array, Map, Struct + * json -> R/W: Interval + * orc -> R/W: Interval, Null + * parquet -> R/W: Interval, Null */ def verifySchema(format: FileFormat, schema: StructType): Unit = { def throwUnsupportedException(dataType: DataType): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 880c1460359e7..51f0385209815 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -206,10 +206,10 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } // Unsupported data types of csv, json, orc, and parquet are as follows; - // csv -> Interval, Null, Array, Map, Struct - // json -> Interval - // orc -> Interval, Null - // parquet -> Interval, Null + // csv -> R/W: Interval, Null, Array, Map, Struct + // json -> R/W: Interval + // orc -> R/W: Interval, Null + // parquet -> R/W: Interval, Null test("SPARK-24204 error handling for unsupported data types") { withTempDir { dir => val csvDir = new File(dir, "csv").getCanonicalPath @@ -219,24 +219,46 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo assert(msg.contains("CSV data source does not support struct data type")) msg = intercept[UnsupportedOperationException] { - Seq((1, Map("Tesla" -> 3))).toDF("id", "cars").write.csv(csvDir) + val schema = StructType.fromDDL("a struct") + spark.range(1).write.mode("overwrite").csv(csvDir) + spark.read.schema(schema).csv(csvDir).collect() + }.getMessage + assert(msg.contains("CSV data source does not support struct data type")) + + msg = intercept[UnsupportedOperationException] { + Seq((1, Map("Tesla" -> 3))).toDF("id", "cars").write.mode("overwrite").csv(csvDir) }.getMessage assert(msg.contains("CSV data source does not support map data type")) msg = intercept[UnsupportedOperationException] { - Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands").write.csv(csvDir) + val schema = StructType.fromDDL("a map") + spark.range(1).write.mode("overwrite").csv(csvDir) + spark.read.schema(schema).csv(csvDir).collect() + }.getMessage + assert(msg.contains("CSV data source does not support map data type")) + + msg = intercept[UnsupportedOperationException] { + Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands") + .write.mode("overwrite").csv(csvDir) }.getMessage assert(msg.contains("CSV data source does not support array data type")) + msg = intercept[UnsupportedOperationException] { + val schema = StructType.fromDDL("a array") + spark.range(1).write.mode("overwrite").csv(csvDir) + spark.read.schema(schema).csv(csvDir).collect() + }.getMessage + assert(msg.contains("CSV data source does not support array data type")) + msg = intercept[UnsupportedOperationException] { Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25)))).toDF("id", "vectors") - .write.csv(csvDir) + .write.mode("overwrite").csv(csvDir) }.getMessage assert(msg.contains("CSV data source does not support array data type")) msg = intercept[UnsupportedOperationException] { val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), true) :: Nil) - spark.range(1).write.csv(csvDir) + spark.range(1).write.mode("overwrite").csv(csvDir) spark.read.schema(schema).csv(csvDir).collect() }.getMessage assert(msg.contains("CSV data source does not support array data type.")) From df1a67f8409c0c81016f5ae9adfe608f0a988273 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sat, 16 Jun 2018 10:41:27 +0900 Subject: [PATCH 08/12] Keep backward-compatibility --- .../datasources/DataSourceUtils.scala | 27 ++++++-- .../datasources/csv/CSVFileFormat.scala | 4 +- .../datasources/json/JsonFileFormat.scala | 4 +- .../datasources/orc/OrcFileFormat.scala | 4 +- .../parquet/ParquetFileFormat.scala | 4 +- .../spark/sql/FileBasedDataSourceSuite.scala | 69 +++++++++++++++++-- .../spark/sql/hive/orc/OrcFileFormat.scala | 4 +- 7 files changed, 98 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 38184a422ae60..1c0a5304d68c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -19,11 +19,26 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.types._ object DataSourceUtils { + /** + * Verify if the schema is supported in datasource in write path. + */ + def verifyWriteSchema(format: FileFormat, schema: StructType): Unit = { + verifySchema(format, schema, isReadPath = false) + } + + /** + * Verify if the schema is supported in datasource in read path. + */ + def verifyReadSchema(format: FileFormat, schema: StructType): Unit = { + verifySchema(format, schema, isReadPath = true) + } + /** * Verify if the schema is supported in datasource. This verification should be done * in a driver side, e.g., `prepareWrite`, `buildReader`, and `buildReaderWithPartitionValues` @@ -31,11 +46,11 @@ object DataSourceUtils { * * Unsupported data types of csv, json, orc, and parquet are as follows; * csv -> R/W: Interval, Null, Array, Map, Struct - * json -> R/W: Interval - * orc -> R/W: Interval, Null + * json -> W: Interval + * orc -> W: Interval, Null * parquet -> R/W: Interval, Null */ - def verifySchema(format: FileFormat, schema: StructType): Unit = { + private def verifySchema(format: FileFormat, schema: StructType, isReadPath: Boolean): Unit = { def throwUnsupportedException(dataType: DataType): Unit = { throw new UnsupportedOperationException( s"$format data source does not support ${dataType.simpleString} data type.") @@ -56,10 +71,14 @@ object DataSourceUtils { verifyType(keyType) verifyType(valueType) + case _: CalendarIntervalType if isReadPath && format.isInstanceOf[JsonFileFormat] || + isReadPath && format.isInstanceOf[OrcFileFormat] => + case udt: UserDefinedType[_] => verifyType(udt.sqlType) // For JSON backward-compatibility - case NullType if format.isInstanceOf[JsonFileFormat] => + case NullType if format.isInstanceOf[JsonFileFormat] || + (isReadPath && format.isInstanceOf[OrcFileFormat]) => case _ => throwUnsupportedException(dataType) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 12b547e4e41a9..fa366ccce6b61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -66,7 +66,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - DataSourceUtils.verifySchema(this, dataSchema) + DataSourceUtils.verifyWriteSchema(this, dataSchema) val conf = job.getConfiguration val csvOptions = new CSVOptions( options, @@ -98,7 +98,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - DataSourceUtils.verifySchema(this, dataSchema) + DataSourceUtils.verifyReadSchema(this, dataSchema) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 36310a5939b57..383bff1375a93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -65,7 +65,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - DataSourceUtils.verifySchema(this, dataSchema) + DataSourceUtils.verifyWriteSchema(this, dataSchema) val conf = job.getConfiguration val parsedOptions = new JSONOptions( @@ -98,7 +98,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { - DataSourceUtils.verifySchema(this, dataSchema) + DataSourceUtils.verifyReadSchema(this, dataSchema) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 3f2982fa19d63..df488a748e3e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -89,7 +89,7 @@ class OrcFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - DataSourceUtils.verifySchema(this, dataSchema) + DataSourceUtils.verifyWriteSchema(this, dataSchema) val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) @@ -143,7 +143,7 @@ class OrcFileFormat filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - DataSourceUtils.verifySchema(this, dataSchema) + DataSourceUtils.verifyReadSchema(this, dataSchema) if (sparkSession.sessionState.conf.orcFilterPushDown) { OrcFilters.createFilter(dataSchema, filters).foreach { f => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index a1a8d71fd4b64..9602a08911dea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -78,7 +78,7 @@ class ParquetFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - DataSourceUtils.verifySchema(this, dataSchema) + DataSourceUtils.verifyWriteSchema(this, dataSchema) val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) @@ -303,7 +303,7 @@ class ParquetFileFormat filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - DataSourceUtils.verifySchema(this, dataSchema) + DataSourceUtils.verifyReadSchema(this, dataSchema) hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) hadoopConf.set( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 51f0385209815..a308d6bd9f051 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -207,8 +207,8 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo // Unsupported data types of csv, json, orc, and parquet are as follows; // csv -> R/W: Interval, Null, Array, Map, Struct - // json -> R/W: Interval - // orc -> R/W: Interval, Null + // json -> W: Interval + // orc -> W: Interval, Null // parquet -> R/W: Interval, Null test("SPARK-24204 error handling for unsupported data types") { withTempDir { dir => @@ -267,7 +267,39 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo withTempDir { dir => val tempDir = new File(dir, "files").getCanonicalPath - Seq("parquet", "orc", "json", "csv").foreach { format => + Seq("orc", "json").foreach { format => + // write path + var msg = intercept[AnalysisException] { + sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir) + }.getMessage + assert(msg.contains("Cannot save interval data type into external storage.")) + + msg = intercept[UnsupportedOperationException] { + spark.udf.register("testType", () => new IntervalData()) + sql("select testType()").write.format(format).mode("overwrite").save(tempDir) + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(s"$format data source does not support calendarinterval data type.")) + + // read path + // We expect the types below should be passed for backward-compatibility + + // Interval type + var schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) + spark.range(1).write.format(format).mode("overwrite").save(tempDir) + spark.read.schema(schema).format(format).load(tempDir).collect() + + // UDT having interval data + schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil) + spark.range(1).write.format(format).mode("overwrite").save(tempDir) + spark.read.schema(schema).format(format).load(tempDir).collect() + } + } + + withTempDir { dir => + val tempDir = new File(dir, "files").getCanonicalPath + + Seq("parquet", "csv").foreach { format => // write path var msg = intercept[AnalysisException] { sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir) @@ -299,7 +331,36 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo .contains(s"$format data source does not support calendarinterval data type.")) } - Seq("parquet", "orc", "csv").foreach { format => + Seq("orc").foreach { format => + // write path + var msg = intercept[UnsupportedOperationException] { + sql("select null").write.format(format).mode("overwrite").save(tempDir) + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(s"$format data source does not support null data type.")) + + msg = intercept[UnsupportedOperationException] { + spark.udf.register("testType", () => new NullData()) + sql("select testType()").write.format(format).mode("overwrite").save(tempDir) + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(s"$format data source does not support null data type.")) + + // read path + // We expect the types below should be passed for backward-compatibility + + // Null type + var schema = StructType(StructField("a", NullType, true) :: Nil) + spark.range(1).write.format(format).mode("overwrite").save(tempDir) + spark.read.schema(schema).format(format).load(tempDir).collect() + + // UDT having null data + schema = StructType(StructField("a", new NullUDT(), true) :: Nil) + spark.range(1).write.format(format).mode("overwrite").save(tempDir) + spark.read.schema(schema).format(format).load(tempDir).collect() + } + + Seq("parquet", "csv").foreach { format => // write path var msg = intercept[UnsupportedOperationException] { sql("select null").write.format(format).mode("overwrite").save(tempDir) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index b36b2ec3ff273..dd2144c5fcea8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -72,7 +72,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - DataSourceUtils.verifySchema(this, dataSchema) + DataSourceUtils.verifyWriteSchema(this, dataSchema) val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) @@ -123,7 +123,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - DataSourceUtils.verifySchema(this, dataSchema) + DataSourceUtils.verifyReadSchema(this, dataSchema) if (sparkSession.sessionState.conf.orcFilterPushDown) { // Sets pushed predicates From 6303e49f493177d347d12886198ae88539d30b3c Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 24 Jun 2018 11:51:35 +0900 Subject: [PATCH 09/12] Brush up code --- .../datasources/DataSourceUtils.scala | 19 +++++-- .../datasources/json/JsonSuite.scala | 49 ------------------- 2 files changed, 16 insertions(+), 52 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 1c0a5304d68c1..5f415d77ed268 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.types._ @@ -60,7 +61,8 @@ object DataSourceUtils { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | StringType | BinaryType | DateType | TimestampType | _: DecimalType => - case _: StructType | _: ArrayType | _: MapType if format.isInstanceOf[CSVFileFormat] => + case _: CalendarIntervalType | _: StructType | _: ArrayType | _: MapType + if format.isInstanceOf[CSVFileFormat] => throwUnsupportedException(dataType) case st: StructType => st.foreach { f => verifyType(f.dataType) } @@ -71,8 +73,10 @@ object DataSourceUtils { verifyType(keyType) verifyType(valueType) - case _: CalendarIntervalType if isReadPath && format.isInstanceOf[JsonFileFormat] || - isReadPath && format.isInstanceOf[OrcFileFormat] => + // JSON and ORC don't support an Interval type, but we pass it in read pass + // for back-compatibility. + case _: CalendarIntervalType if isReadPath && + (format.isInstanceOf[JsonFileFormat] | format.isInstanceOf[OrcFileFormat]) => case udt: UserDefinedType[_] => verifyType(udt.sqlType) @@ -80,6 +84,15 @@ object DataSourceUtils { case NullType if format.isInstanceOf[JsonFileFormat] || (isReadPath && format.isInstanceOf[OrcFileFormat]) => + // Actually we won't pass in unsupported data types below, this is a safety check + case _: CalendarIntervalType if format.isInstanceOf[JsonFileFormat] => + throwUnsupportedException(dataType) + + case _: CalendarIntervalType | _: NullType + if format.isInstanceOf[ParquetFileFormat] || format.isInstanceOf[OrcFileFormat] => + throwUnsupportedException(dataType) + + // We keep this default case for safeguards case _ => throwUnsupportedException(dataType) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index db57abec36c59..897424daca0cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2494,53 +2494,4 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(exception.getMessage.contains("encoding must not be included in the blacklist")) } } - - test("SPARK-24204 error handling for unsupported data types") { - withTempDir { dir => - val jsonDir = new File(dir, "json").getCanonicalPath - - // write path - var msg = intercept[AnalysisException] { - sql("select interval 1 days").write.mode("overwrite").json(jsonDir) - }.getMessage - assert(msg.contains("Cannot save interval data type into external storage.")) - - msg = intercept[UnsupportedOperationException] { - spark.udf.register("testType", () => new IntervalData()) - sql("select testType()").write.mode("overwrite").json(jsonDir) - }.getMessage - assert(msg.contains("JSON data source does not support calendarinterval data type.")) - - // read path - msg = intercept[UnsupportedOperationException] { - val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) - spark.range(1).write.mode("overwrite").json(jsonDir) - spark.read.schema(schema).json(jsonDir).collect() - }.getMessage - assert(msg.contains("JSON data source does not support calendarinterval data type.")) - - msg = intercept[UnsupportedOperationException] { - val schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil) - spark.range(1).write.mode("overwrite").json(jsonDir) - spark.read.schema(schema).json(jsonDir).collect() - }.getMessage - assert(msg.contains("JSON data source does not support calendarinterval data type.")) - } - } -} - -object TestingUDT { - - @SQLUserDefinedType(udt = classOf[IntervalUDT]) - private[sql] class IntervalData extends Serializable - - private[sql] class IntervalUDT extends UserDefinedType[IntervalData] { - - override def sqlType: DataType = CalendarIntervalType - override def serialize(obj: IntervalData): Any = - throw new NotImplementedError("Not implemented") - override def deserialize(datum: Any): IntervalData = - throw new NotImplementedError("Not implemented") - override def userClass: Class[IntervalData] = classOf[IntervalData] - } } From 6301fb4951136939146d68a928b3cc48dd1e2173 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 24 Jun 2018 12:05:16 +0900 Subject: [PATCH 10/12] Spit test cases into pieces --- .../apache/spark/sql/FileBasedDataSourceSuite.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index a308d6bd9f051..ecc40f945377d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -210,7 +210,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo // json -> W: Interval // orc -> W: Interval, Null // parquet -> R/W: Interval, Null - test("SPARK-24204 error handling for unsupported data types") { + test("SPARK-24204 error handling for unsupported Array/Map/Struct types - csv") { withTempDir { dir => val csvDir = new File(dir, "csv").getCanonicalPath var msg = intercept[UnsupportedOperationException] { @@ -263,7 +263,9 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo }.getMessage assert(msg.contains("CSV data source does not support array data type.")) } + } + test("SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc") { withTempDir { dir => val tempDir = new File(dir, "files").getCanonicalPath @@ -330,6 +332,12 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo assert(msg.toLowerCase(Locale.ROOT) .contains(s"$format data source does not support calendarinterval data type.")) } + } + } + + test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") { + withTempDir { dir => + val tempDir = new File(dir, "files").getCanonicalPath Seq("orc").foreach { format => // write path From 50e7b113e976c669d981a9a93c10967162bec2c0 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Wed, 27 Jun 2018 11:27:51 +0900 Subject: [PATCH 11/12] Review applied --- .../spark/sql/FileBasedDataSourceSuite.scala | 51 +++++++------------ 1 file changed, 17 insertions(+), 34 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index ecc40f945377d..86f9647b4ac4c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -269,8 +269,8 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo withTempDir { dir => val tempDir = new File(dir, "files").getCanonicalPath - Seq("orc", "json").foreach { format => - // write path + // write path + Seq("csv", "json", "parquet", "orc").foreach { format => var msg = intercept[AnalysisException] { sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir) }.getMessage @@ -282,41 +282,11 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo }.getMessage assert(msg.toLowerCase(Locale.ROOT) .contains(s"$format data source does not support calendarinterval data type.")) - - // read path - // We expect the types below should be passed for backward-compatibility - - // Interval type - var schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) - spark.range(1).write.format(format).mode("overwrite").save(tempDir) - spark.read.schema(schema).format(format).load(tempDir).collect() - - // UDT having interval data - schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil) - spark.range(1).write.format(format).mode("overwrite").save(tempDir) - spark.read.schema(schema).format(format).load(tempDir).collect() } - } - - withTempDir { dir => - val tempDir = new File(dir, "files").getCanonicalPath + // read path Seq("parquet", "csv").foreach { format => - // write path - var msg = intercept[AnalysisException] { - sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir) - }.getMessage - assert(msg.contains("Cannot save interval data type into external storage.")) - - msg = intercept[UnsupportedOperationException] { - spark.udf.register("testType", () => new IntervalData()) - sql("select testType()").write.format(format).mode("overwrite").save(tempDir) - }.getMessage - assert(msg.toLowerCase(Locale.ROOT) - .contains(s"$format data source does not support calendarinterval data type.")) - - // read path - msg = intercept[UnsupportedOperationException] { + var msg = intercept[UnsupportedOperationException] { val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) spark.range(1).write.format(format).mode("overwrite").save(tempDir) spark.read.schema(schema).format(format).load(tempDir).collect() @@ -332,6 +302,19 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo assert(msg.toLowerCase(Locale.ROOT) .contains(s"$format data source does not support calendarinterval data type.")) } + + // We expect the types below should be passed for backward-compatibility + Seq("orc", "json").foreach { format => + // Interval type + var schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) + spark.range(1).write.format(format).mode("overwrite").save(tempDir) + spark.read.schema(schema).format(format).load(tempDir).collect() + + // UDT having interval data + schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil) + spark.range(1).write.format(format).mode("overwrite").save(tempDir) + spark.read.schema(schema).format(format).load(tempDir).collect() + } } } From 1cfc7b02089401eca2f17db55b113e6620f398be Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Wed, 27 Jun 2018 12:10:32 +0900 Subject: [PATCH 12/12] Fix --- .../datasources/DataSourceUtils.scala | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 5f415d77ed268..c5347218c4b40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -61,7 +61,8 @@ object DataSourceUtils { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | StringType | BinaryType | DateType | TimestampType | _: DecimalType => - case _: CalendarIntervalType | _: StructType | _: ArrayType | _: MapType + // All the unsupported types for CSV + case _: NullType | _: CalendarIntervalType | _: StructType | _: ArrayType | _: MapType if format.isInstanceOf[CSVFileFormat] => throwUnsupportedException(dataType) @@ -73,23 +74,27 @@ object DataSourceUtils { verifyType(keyType) verifyType(valueType) + case udt: UserDefinedType[_] => verifyType(udt.sqlType) + + // Interval type not supported in all the write path + case _: CalendarIntervalType if !isReadPath => + throwUnsupportedException(dataType) + // JSON and ORC don't support an Interval type, but we pass it in read pass // for back-compatibility. - case _: CalendarIntervalType if isReadPath && - (format.isInstanceOf[JsonFileFormat] | format.isInstanceOf[OrcFileFormat]) => + case _: CalendarIntervalType if format.isInstanceOf[JsonFileFormat] || + format.isInstanceOf[OrcFileFormat] => - case udt: UserDefinedType[_] => verifyType(udt.sqlType) + // Interval type not supported in the other read path + case _: CalendarIntervalType => + throwUnsupportedException(dataType) - // For JSON backward-compatibility - case NullType if format.isInstanceOf[JsonFileFormat] || + // For JSON & ORC backward-compatibility + case _: NullType if format.isInstanceOf[JsonFileFormat] || (isReadPath && format.isInstanceOf[OrcFileFormat]) => - // Actually we won't pass in unsupported data types below, this is a safety check - case _: CalendarIntervalType if format.isInstanceOf[JsonFileFormat] => - throwUnsupportedException(dataType) - - case _: CalendarIntervalType | _: NullType - if format.isInstanceOf[ParquetFileFormat] || format.isInstanceOf[OrcFileFormat] => + // Null type not supported in the other path + case _: NullType => throwUnsupportedException(dataType) // We keep this default case for safeguards