From 63818c10539598e4ca32dff8774e945c0a4c2c86 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 29 Jun 2018 00:32:44 +0800 Subject: [PATCH 01/10] refactor schema validation --- .../sql/execution/DataSourceScanExec.scala | 1 + .../datasources/DataSourceUtils.scala | 70 ++++--------------- .../execution/datasources/FileFormat.scala | 14 +++- .../datasources/FileFormatWriter.scala | 4 +- .../datasources/csv/CSVFileFormat.scala | 9 ++- .../datasources/json/JsonFileFormat.scala | 11 +-- .../datasources/orc/OrcFileFormat.scala | 9 +-- .../parquet/ParquetFileFormat.scala | 9 +-- .../spark/sql/hive/orc/OrcFileFormat.scala | 7 +- .../sql/hive/orc/HiveOrcSourceSuite.scala | 22 ------ 10 files changed, 60 insertions(+), 96 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index d7f2654be0451..29a37b27ae332 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -306,6 +306,7 @@ case class FileSourceScanExec( } private lazy val inputRDD: RDD[InternalRow] = { + DataSourceUtils.verifyReadSchema(relation.fileFormat, relation.dataSchema) val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index c5347218c4b40..b4ee84d900e39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -17,10 +17,6 @@ package org.apache.spark.sql.execution.datasources -import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat -import org.apache.spark.sql.execution.datasources.json.JsonFileFormat -import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.types._ @@ -42,63 +38,27 @@ object DataSourceUtils { /** * Verify if the schema is supported in datasource. This verification should be done - * in a driver side, e.g., `prepareWrite`, `buildReader`, and `buildReaderWithPartitionValues` - * in `FileFormat`. - * - * Unsupported data types of csv, json, orc, and parquet are as follows; - * csv -> R/W: Interval, Null, Array, Map, Struct - * json -> W: Interval - * orc -> W: Interval, Null - * parquet -> R/W: Interval, Null + * in a driver side. */ private def verifySchema(format: FileFormat, schema: StructType, isReadPath: Boolean): Unit = { - def throwUnsupportedException(dataType: DataType): Unit = { - throw new UnsupportedOperationException( - s"$format data source does not support ${dataType.simpleString} data type.") - } - - def verifyType(dataType: DataType): Unit = dataType match { - case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | - StringType | BinaryType | DateType | TimestampType | _: DecimalType => - - // All the unsupported types for CSV - case _: NullType | _: CalendarIntervalType | _: StructType | _: ArrayType | _: MapType - if format.isInstanceOf[CSVFileFormat] => - throwUnsupportedException(dataType) - - case st: StructType => st.foreach { f => verifyType(f.dataType) } - - case ArrayType(elementType, _) => verifyType(elementType) - - case MapType(keyType, valueType, _) => - verifyType(keyType) - verifyType(valueType) - - case udt: UserDefinedType[_] => verifyType(udt.sqlType) - - // Interval type not supported in all the write path - case _: CalendarIntervalType if !isReadPath => - throwUnsupportedException(dataType) - - // JSON and ORC don't support an Interval type, but we pass it in read pass - // for back-compatibility. - case _: CalendarIntervalType if format.isInstanceOf[JsonFileFormat] || - format.isInstanceOf[OrcFileFormat] => + def verifyType(dataType: DataType): Unit = { + if (!format.supportDataType(dataType, isReadPath)) { + throw new UnsupportedOperationException( + s"$format data source does not support ${dataType.simpleString} data type.") + } + dataType match { + case st: StructType => st.foreach { f => verifyType(f.dataType) } - // Interval type not supported in the other read path - case _: CalendarIntervalType => - throwUnsupportedException(dataType) + case ArrayType(elementType, _) => verifyType(elementType) - // For JSON & ORC backward-compatibility - case _: NullType if format.isInstanceOf[JsonFileFormat] || - (isReadPath && format.isInstanceOf[OrcFileFormat]) => + case MapType(keyType, valueType, _) => + verifyType(keyType) + verifyType(valueType) - // Null type not supported in the other path - case _: NullType => - throwUnsupportedException(dataType) + case udt: UserDefinedType[_] => verifyType(udt.sqlType) - // We keep this default case for safeguards - case _ => throwUnsupportedException(dataType) + case _ => + } } schema.foreach(field => verifyType(field.dataType)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 023e127888290..6076cf11b981e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{CalendarIntervalType, DataType, StructType} /** @@ -57,7 +57,7 @@ trait FileFormat { dataSchema: StructType): OutputWriterFactory /** - * Returns whether this format support returning columnar batch or not. + * Returns whether this format supports returning columnar batch or not. * * TODO: we should just have different traits for the different formats. */ @@ -152,6 +152,16 @@ trait FileFormat { } } + /** + * Returns whether this format supports the given [[DataType]] in read/write path. + * + * By default all data types are supported except [[CalendarIntervalType]] in write path. + */ + def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { + case _: CalendarIntervalType if !isReadPath => false + case _ => true + } + } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 52da8356ab835..7c6ab4bc922fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -96,9 +96,11 @@ object FileFormatWriter extends Logging { val caseInsensitiveOptions = CaseInsensitiveMap(options) + val dataSchema = dataColumns.toStructType + DataSourceUtils.verifyWriteSchema(fileFormat, dataSchema) // Note: prepareWrite has side effect. It sets "job". val outputWriterFactory = - fileFormat.prepareWrite(sparkSession, job, caseInsensitiveOptions, dataColumns.toStructType) + fileFormat.prepareWrite(sparkSession, job, caseInsensitiveOptions, dataSchema) val description = new WriteJobDescription( uuid = UUID.randomUUID().toString, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index fa366ccce6b61..a0ef30bfb15b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -66,7 +66,6 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - DataSourceUtils.verifyWriteSchema(this, dataSchema) val conf = job.getConfiguration val csvOptions = new CSVOptions( options, @@ -98,7 +97,6 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - DataSourceUtils.verifyReadSchema(this, dataSchema) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) @@ -153,6 +151,13 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { override def hashCode(): Int = getClass.hashCode() override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat] + + override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { + case _: NullType | _: CalendarIntervalType | _: StructType | _: ArrayType | _: MapType => + false + case _ => true + } + } private[csv] class CsvOutputWriter( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 383bff1375a93..7a3e3fda6821d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSON import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { @@ -65,8 +65,6 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - DataSourceUtils.verifyWriteSchema(this, dataSchema) - val conf = job.getConfiguration val parsedOptions = new JSONOptions( options, @@ -98,8 +96,6 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { - DataSourceUtils.verifyReadSchema(this, dataSchema) - val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) @@ -148,6 +144,11 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { override def hashCode(): Int = getClass.hashCode() override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat] + + override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { + case _: CalendarIntervalType if !isReadPath => false + case _ => true + } } private[json] class JsonOutputWriter( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index df488a748e3e5..b62acd82970b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -89,8 +89,6 @@ class OrcFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - DataSourceUtils.verifyWriteSchema(this, dataSchema) - val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) val conf = job.getConfiguration @@ -143,8 +141,6 @@ class OrcFileFormat filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - DataSourceUtils.verifyReadSchema(this, dataSchema) - if (sparkSession.sessionState.conf.orcFilterPushDown) { OrcFilters.createFilter(dataSchema, filters).foreach { f => OrcInputFormat.setSearchArgument(hadoopConf, f, dataSchema.fieldNames) @@ -228,4 +224,9 @@ class OrcFileFormat } } } + + override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { + case _: NullType | _: CalendarIntervalType if !isReadPath => false + case _ => true + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 52a18abb55241..438765b15c732 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -78,8 +78,6 @@ class ParquetFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - DataSourceUtils.verifyWriteSchema(this, dataSchema) - val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) val conf = ContextUtil.getConfiguration(job) @@ -303,8 +301,6 @@ class ParquetFileFormat filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - DataSourceUtils.verifyReadSchema(this, dataSchema) - hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) hadoopConf.set( ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, @@ -454,6 +450,11 @@ class ParquetFileFormat } } } + + override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { + case _: NullType | _: CalendarIntervalType => false + case _ => true + } } object ParquetFileFormat extends Logging { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index dd2144c5fcea8..49d1f0f3fd277 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.OrcOptions import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} import org.apache.spark.sql.sources.{Filter, _} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{CalendarIntervalType, DataType, NullType, StructType} import org.apache.spark.util.SerializableConfiguration /** @@ -178,6 +178,11 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable } } } + + override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { + case _: NullType | _: CalendarIntervalType if !isReadPath => false + case _ => true + } } private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala index 69009e1b520c2..f467d820776ed 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala @@ -156,28 +156,6 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { sql("select testType()").write.mode("overwrite").orc(orcDir) }.getMessage assert(msg.contains("ORC data source does not support calendarinterval data type.")) - - // read path - msg = intercept[UnsupportedOperationException] { - val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) - spark.range(1).write.mode("overwrite").orc(orcDir) - spark.read.schema(schema).orc(orcDir).collect() - }.getMessage - assert(msg.contains("ORC data source does not support calendarinterval data type.")) - - msg = intercept[UnsupportedOperationException] { - val schema = StructType(StructField("a", NullType, true) :: Nil) - spark.range(1).write.mode("overwrite").orc(orcDir) - spark.read.schema(schema).orc(orcDir).collect() - }.getMessage - assert(msg.contains("ORC data source does not support null data type.")) - - msg = intercept[UnsupportedOperationException] { - val schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil) - spark.range(1).write.mode("overwrite").orc(orcDir) - spark.read.schema(schema).orc(orcDir).collect() - }.getMessage - assert(msg.contains("ORC data source does not support calendarinterval data type.")) } } } From e39cd02c29060f89f793160225806abb7189a9c5 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 3 Jul 2018 23:44:33 +0800 Subject: [PATCH 02/10] address comments --- .../datasources/DataSourceUtils.scala | 22 +----------------- .../execution/datasources/FileFormat.scala | 14 ++++------- .../datasources/csv/CSVFileFormat.scala | 12 ++++++---- .../datasources/json/JsonFileFormat.scala | 23 ++++++++++++++++--- .../datasources/orc/OrcFileFormat.scala | 21 ++++++++++++++--- .../parquet/ParquetFileFormat.scala | 19 ++++++++++++--- .../spark/sql/hive/orc/OrcFileFormat.scala | 23 +++++++++++++++---- 7 files changed, 87 insertions(+), 47 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index b4ee84d900e39..18bd57dce277e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -41,26 +41,6 @@ object DataSourceUtils { * in a driver side. */ private def verifySchema(format: FileFormat, schema: StructType, isReadPath: Boolean): Unit = { - def verifyType(dataType: DataType): Unit = { - if (!format.supportDataType(dataType, isReadPath)) { - throw new UnsupportedOperationException( - s"$format data source does not support ${dataType.simpleString} data type.") - } - dataType match { - case st: StructType => st.foreach { f => verifyType(f.dataType) } - - case ArrayType(elementType, _) => verifyType(elementType) - - case MapType(keyType, valueType, _) => - verifyType(keyType) - verifyType(valueType) - - case udt: UserDefinedType[_] => verifyType(udt.sqlType) - - case _ => - } - } - - schema.foreach(field => verifyType(field.dataType)) + schema.foreach(field => format.validateDataType(field.dataType, isReadPath)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 6076cf11b981e..9115fde51d440 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.{CalendarIntervalType, DataType, StructType} +import org.apache.spark.sql.types.{DataType, StructType} /** @@ -153,15 +153,11 @@ trait FileFormat { } /** - * Returns whether this format supports the given [[DataType]] in read/write path. - * - * By default all data types are supported except [[CalendarIntervalType]] in write path. + * Validate the given [[DataType]] in read/write path for this file format. + * If the [[DataType]] is not supported, an exception will be thrown. + * By default all data types are supported. */ - def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { - case _: CalendarIntervalType if !isReadPath => false - case _ => true - } - + def validateDataType(dataType: DataType, isReadPath: Boolean): Unit = {} } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index a0ef30bfb15b8..6d4b2efd268e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -152,10 +152,14 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat] - override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { - case _: NullType | _: CalendarIntervalType | _: StructType | _: ArrayType | _: MapType => - false - case _ => true + override def validateDataType(dataType: DataType, isReadPath: Boolean): Unit = dataType match { + case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | + StringType | BinaryType | DateType | TimestampType | _: DecimalType => + + case udt: UserDefinedType[_] => validateDataType(udt.sqlType, isReadPath) + + case _ => throw new UnsupportedOperationException( + s"$this data source does not support ${dataType.simpleString} data type.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 7a3e3fda6821d..15745b5ad715c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -145,9 +145,26 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat] - override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { - case _: CalendarIntervalType if !isReadPath => false - case _ => true + override def validateDataType(dataType: DataType, isReadPath: Boolean): Unit = dataType match { + case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | + StringType | BinaryType | DateType | TimestampType | _: DecimalType => + + case st: StructType => st.foreach { f => validateDataType(f.dataType, isReadPath) } + + case ArrayType(elementType, _) => validateDataType(elementType, isReadPath) + + case MapType(keyType, valueType, _) => + validateDataType(keyType, isReadPath) + validateDataType(valueType, isReadPath) + + case udt: UserDefinedType[_] => validateDataType(udt.sqlType, isReadPath) + + case _: NullType => + + case _: CalendarIntervalType if isReadPath => + + case _ => throw new UnsupportedOperationException( + s"$this data source does not support ${dataType.simpleString} data type.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index b62acd82970b4..20b1688e36860 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -225,8 +225,23 @@ class OrcFileFormat } } - override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { - case _: NullType | _: CalendarIntervalType if !isReadPath => false - case _ => true + override def validateDataType(dataType: DataType, isReadPath: Boolean): Unit = dataType match { + case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | + StringType | BinaryType | DateType | TimestampType | _: DecimalType => + + case st: StructType => st.foreach { f => validateDataType(f.dataType, isReadPath) } + + case ArrayType(elementType, _) => validateDataType(elementType, isReadPath) + + case MapType(keyType, valueType, _) => + validateDataType(keyType, isReadPath) + validateDataType(valueType, isReadPath) + + case udt: UserDefinedType[_] => validateDataType(udt.sqlType, isReadPath) + + case _: NullType | _: CalendarIntervalType if isReadPath => + + case _ => throw new UnsupportedOperationException( + s"$this data source does not support ${dataType.simpleString} data type.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 438765b15c732..20b152c8bd8e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -451,9 +451,22 @@ class ParquetFileFormat } } - override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { - case _: NullType | _: CalendarIntervalType => false - case _ => true + override def validateDataType(dataType: DataType, isReadPath: Boolean): Unit = dataType match { + case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | + StringType | BinaryType | DateType | TimestampType | _: DecimalType => + + case st: StructType => st.foreach { f => validateDataType(f.dataType, isReadPath) } + + case ArrayType(elementType, _) => validateDataType(elementType, isReadPath) + + case MapType(keyType, valueType, _) => + validateDataType(keyType, isReadPath) + validateDataType(valueType, isReadPath) + + case udt: UserDefinedType[_] => validateDataType(udt.sqlType, isReadPath) + + case _ => throw new UnsupportedOperationException( + s"$this data source does not support ${dataType.simpleString} data type.") } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 49d1f0f3fd277..700e01f20ac6c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.OrcOptions import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} import org.apache.spark.sql.sources.{Filter, _} -import org.apache.spark.sql.types.{CalendarIntervalType, DataType, NullType, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration /** @@ -179,9 +179,24 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable } } - override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { - case _: NullType | _: CalendarIntervalType if !isReadPath => false - case _ => true + override def validateDataType(dataType: DataType, isReadPath: Boolean): Unit = dataType match { + case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | + StringType | BinaryType | DateType | TimestampType | _: DecimalType => + + case st: StructType => st.foreach { f => validateDataType(f.dataType, isReadPath) } + + case ArrayType(elementType, _) => validateDataType(elementType, isReadPath) + + case MapType(keyType, valueType, _) => + validateDataType(keyType, isReadPath) + validateDataType(valueType, isReadPath) + + case udt: UserDefinedType[_] => validateDataType(udt.sqlType, isReadPath) + + case _: NullType | _: CalendarIntervalType if isReadPath => + + case _ => throw new UnsupportedOperationException( + s"$this data source does not support ${dataType.simpleString} data type.") } } From e376adac922e0b6a76ece02f300b305d9774caa3 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 4 Jul 2018 22:48:53 +0800 Subject: [PATCH 03/10] address comments and validate schema in text file format --- .../datasources/DataSourceUtils.scala | 8 +- .../execution/datasources/FileFormat.scala | 5 +- .../datasources/csv/CSVFileFormat.scala | 9 +- .../datasources/json/JsonFileFormat.scala | 20 ++- .../datasources/orc/OrcFileFormat.scala | 18 ++- .../parquet/ParquetFileFormat.scala | 16 ++- .../datasources/text/TextFileFormat.scala | 12 +- .../spark/sql/FileBasedDataSourceSuite.scala | 119 ++++++++++++------ .../datasources/FileSourceStrategySuite.scala | 6 +- .../spark/sql/hive/orc/OrcFileFormat.scala | 18 ++- .../sql/hive/orc/HiveOrcSourceSuite.scala | 6 +- 11 files changed, 138 insertions(+), 99 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 18bd57dce277e..82e99190ecf14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.types._ @@ -41,6 +42,11 @@ object DataSourceUtils { * in a driver side. */ private def verifySchema(format: FileFormat, schema: StructType, isReadPath: Boolean): Unit = { - schema.foreach(field => format.validateDataType(field.dataType, isReadPath)) + schema.foreach { field => + if (!format.supportDataType(field.dataType, isReadPath)) { + throw new AnalysisException( + s"$format data source does not support ${field.dataType.simpleString} data type.") + } + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 9115fde51d440..2c162e23644ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -153,11 +153,10 @@ trait FileFormat { } /** - * Validate the given [[DataType]] in read/write path for this file format. - * If the [[DataType]] is not supported, an exception will be thrown. + * Returns whether this format supports the given [[DataType]] in read/write path. * By default all data types are supported. */ - def validateDataType(dataType: DataType, isReadPath: Boolean): Unit = {} + def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 6d4b2efd268e4..04152fdfab75d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -152,14 +152,13 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat] - override def validateDataType(dataType: DataType, isReadPath: Boolean): Unit = dataType match { + override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | - StringType | BinaryType | DateType | TimestampType | _: DecimalType => + StringType | BinaryType | DateType | TimestampType | _: DecimalType => true - case udt: UserDefinedType[_] => validateDataType(udt.sqlType, isReadPath) + case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath) - case _ => throw new UnsupportedOperationException( - s"$this data source does not support ${dataType.simpleString} data type.") + case _ => false } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 15745b5ad715c..cb51e341bf047 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -145,26 +145,20 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat] - override def validateDataType(dataType: DataType, isReadPath: Boolean): Unit = dataType match { + override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | - StringType | BinaryType | DateType | TimestampType | _: DecimalType => + StringType | BinaryType | DateType | TimestampType | _: DecimalType => true - case st: StructType => st.foreach { f => validateDataType(f.dataType, isReadPath) } + case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) } - case ArrayType(elementType, _) => validateDataType(elementType, isReadPath) + case ArrayType(elementType, _) => supportDataType(elementType, isReadPath) case MapType(keyType, valueType, _) => - validateDataType(keyType, isReadPath) - validateDataType(valueType, isReadPath) + supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath) - case udt: UserDefinedType[_] => validateDataType(udt.sqlType, isReadPath) + case _: NullType => true - case _: NullType => - - case _: CalendarIntervalType if isReadPath => - - case _ => throw new UnsupportedOperationException( - s"$this data source does not support ${dataType.simpleString} data type.") + case _ => false } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 20b1688e36860..f8a84395ec257 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -225,23 +225,21 @@ class OrcFileFormat } } - override def validateDataType(dataType: DataType, isReadPath: Boolean): Unit = dataType match { + override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | - StringType | BinaryType | DateType | TimestampType | _: DecimalType => + StringType | BinaryType | DateType | TimestampType | _: DecimalType => true - case st: StructType => st.foreach { f => validateDataType(f.dataType, isReadPath) } + case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) } - case ArrayType(elementType, _) => validateDataType(elementType, isReadPath) + case ArrayType(elementType, _) => supportDataType(elementType, isReadPath) case MapType(keyType, valueType, _) => - validateDataType(keyType, isReadPath) - validateDataType(valueType, isReadPath) + supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath) - case udt: UserDefinedType[_] => validateDataType(udt.sqlType, isReadPath) + case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath) - case _: NullType | _: CalendarIntervalType if isReadPath => + case _: NullType if isReadPath => true - case _ => throw new UnsupportedOperationException( - s"$this data source does not support ${dataType.simpleString} data type.") + case _ => false } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 20b152c8bd8e1..53634d76240fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -451,22 +451,20 @@ class ParquetFileFormat } } - override def validateDataType(dataType: DataType, isReadPath: Boolean): Unit = dataType match { + override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | - StringType | BinaryType | DateType | TimestampType | _: DecimalType => + StringType | BinaryType | DateType | TimestampType | _: DecimalType => true - case st: StructType => st.foreach { f => validateDataType(f.dataType, isReadPath) } + case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) } - case ArrayType(elementType, _) => validateDataType(elementType, isReadPath) + case ArrayType(elementType, _) => supportDataType(elementType, isReadPath) case MapType(keyType, valueType, _) => - validateDataType(keyType, isReadPath) - validateDataType(valueType, isReadPath) + supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath) - case udt: UserDefinedType[_] => validateDataType(udt.sqlType, isReadPath) + case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath) - case _ => throw new UnsupportedOperationException( - s"$this data source does not support ${dataType.simpleString} data type.") + case _ => false } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index e93908da43535..289923aa28a09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.sql.types.{DataType, StringType, StructType} import org.apache.spark.util.SerializableConfiguration /** @@ -47,11 +47,6 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { throw new AnalysisException( s"Text data source supports only a single column, and you have ${schema.size} columns.") } - val tpe = schema(0).dataType - if (tpe != StringType) { - throw new AnalysisException( - s"Text data source supports only a string column, but you have ${tpe.simpleString}.") - } } override def isSplitable( @@ -141,6 +136,11 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { } } } + + override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { + case StringType => true + case _ => false + } } class TextOutputWriter( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 86f9647b4ac4c..a7ce952b70ac1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -205,63 +205,121 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } + // Text file format only supports string type + test("SPARK-24691 error handling for unsupported types - text") { + withTempDir { dir => + // write path + val textDir = new File(dir, "text").getCanonicalPath + var msg = intercept[AnalysisException] { + Seq(1).toDF.write.text(textDir) + }.getMessage + assert(msg.contains("Text data source does not support int data type")) + + msg = intercept[AnalysisException] { + Seq(1.2).toDF.write.text(textDir) + }.getMessage + assert(msg.contains("Text data source does not support double data type")) + + msg = intercept[AnalysisException] { + Seq(true).toDF.write.text(textDir) + }.getMessage + assert(msg.contains("Text data source does not support boolean data type")) + + msg = intercept[AnalysisException] { + Seq(1).toDF("a").selectExpr("struct(a)").write.text(textDir) + }.getMessage + assert(msg.contains("Text data source does not support struct data type")) + + msg = intercept[AnalysisException] { + Seq((Map("Tesla" -> 3))).toDF("cars").write.mode("overwrite").text(textDir) + }.getMessage + assert(msg.contains("Text data source does not support map data type")) + + msg = intercept[AnalysisException] { + Seq((Array("Tesla", "Chevy", "Ford"))).toDF("brands") + .write.mode("overwrite").text(textDir) + }.getMessage + assert(msg.contains("Text data source does not support array data type")) + + // read path + Seq("aaa").toDF.write.mode("overwrite").text(textDir) + msg = intercept[AnalysisException] { + val schema = StructType(StructField("a", IntegerType, true) :: Nil) + spark.read.schema(schema).text(textDir).collect() + }.getMessage + assert(msg.contains("Text data source does not support int data type")) + + msg = intercept[AnalysisException] { + val schema = StructType(StructField("a", DoubleType, true) :: Nil) + spark.read.schema(schema).text(textDir).collect() + }.getMessage + assert(msg.contains("Text data source does not support double data type")) + + msg = intercept[AnalysisException] { + val schema = StructType(StructField("a", BooleanType, true) :: Nil) + spark.read.schema(schema).text(textDir).collect() + }.getMessage + assert(msg.contains("Text data source does not support boolean data type")) + } + } + // Unsupported data types of csv, json, orc, and parquet are as follows; - // csv -> R/W: Interval, Null, Array, Map, Struct - // json -> W: Interval - // orc -> W: Interval, Null + // csv -> R/W: Null, Array, Map, Struct + // json -> R/W: Interval + // orc -> R/W: Interval, W: Null // parquet -> R/W: Interval, Null test("SPARK-24204 error handling for unsupported Array/Map/Struct types - csv") { withTempDir { dir => val csvDir = new File(dir, "csv").getCanonicalPath - var msg = intercept[UnsupportedOperationException] { + var msg = intercept[AnalysisException] { Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, b)").write.csv(csvDir) }.getMessage assert(msg.contains("CSV data source does not support struct data type")) - msg = intercept[UnsupportedOperationException] { + msg = intercept[AnalysisException] { val schema = StructType.fromDDL("a struct") spark.range(1).write.mode("overwrite").csv(csvDir) spark.read.schema(schema).csv(csvDir).collect() }.getMessage assert(msg.contains("CSV data source does not support struct data type")) - msg = intercept[UnsupportedOperationException] { + msg = intercept[AnalysisException] { Seq((1, Map("Tesla" -> 3))).toDF("id", "cars").write.mode("overwrite").csv(csvDir) }.getMessage assert(msg.contains("CSV data source does not support map data type")) - msg = intercept[UnsupportedOperationException] { + msg = intercept[AnalysisException] { val schema = StructType.fromDDL("a map") spark.range(1).write.mode("overwrite").csv(csvDir) spark.read.schema(schema).csv(csvDir).collect() }.getMessage assert(msg.contains("CSV data source does not support map data type")) - msg = intercept[UnsupportedOperationException] { + msg = intercept[AnalysisException] { Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands") .write.mode("overwrite").csv(csvDir) }.getMessage assert(msg.contains("CSV data source does not support array data type")) - msg = intercept[UnsupportedOperationException] { + msg = intercept[AnalysisException] { val schema = StructType.fromDDL("a array") spark.range(1).write.mode("overwrite").csv(csvDir) spark.read.schema(schema).csv(csvDir).collect() }.getMessage assert(msg.contains("CSV data source does not support array data type")) - msg = intercept[UnsupportedOperationException] { + msg = intercept[AnalysisException] { Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25)))).toDF("id", "vectors") .write.mode("overwrite").csv(csvDir) }.getMessage - assert(msg.contains("CSV data source does not support array data type")) + assert(msg.contains("CSV data source does not support mydensevector data type")) - msg = intercept[UnsupportedOperationException] { + msg = intercept[AnalysisException] { val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), true) :: Nil) spark.range(1).write.mode("overwrite").csv(csvDir) spark.read.schema(schema).csv(csvDir).collect() }.getMessage - assert(msg.contains("CSV data source does not support array data type.")) + assert(msg.contains("CSV data source does not support mydensevector data type.")) } } @@ -276,17 +334,17 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo }.getMessage assert(msg.contains("Cannot save interval data type into external storage.")) - msg = intercept[UnsupportedOperationException] { + msg = intercept[AnalysisException] { spark.udf.register("testType", () => new IntervalData()) sql("select testType()").write.format(format).mode("overwrite").save(tempDir) }.getMessage assert(msg.toLowerCase(Locale.ROOT) - .contains(s"$format data source does not support calendarinterval data type.")) + .contains(s"$format data source does not support interval data type.")) } // read path Seq("parquet", "csv").foreach { format => - var msg = intercept[UnsupportedOperationException] { + var msg = intercept[AnalysisException] { val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) spark.range(1).write.format(format).mode("overwrite").save(tempDir) spark.read.schema(schema).format(format).load(tempDir).collect() @@ -294,26 +352,13 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo assert(msg.toLowerCase(Locale.ROOT) .contains(s"$format data source does not support calendarinterval data type.")) - msg = intercept[UnsupportedOperationException] { + msg = intercept[AnalysisException] { val schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil) spark.range(1).write.format(format).mode("overwrite").save(tempDir) spark.read.schema(schema).format(format).load(tempDir).collect() }.getMessage assert(msg.toLowerCase(Locale.ROOT) - .contains(s"$format data source does not support calendarinterval data type.")) - } - - // We expect the types below should be passed for backward-compatibility - Seq("orc", "json").foreach { format => - // Interval type - var schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) - spark.range(1).write.format(format).mode("overwrite").save(tempDir) - spark.read.schema(schema).format(format).load(tempDir).collect() - - // UDT having interval data - schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil) - spark.range(1).write.format(format).mode("overwrite").save(tempDir) - spark.read.schema(schema).format(format).load(tempDir).collect() + .contains(s"$format data source does not support interval data type.")) } } } @@ -324,13 +369,13 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo Seq("orc").foreach { format => // write path - var msg = intercept[UnsupportedOperationException] { + var msg = intercept[AnalysisException] { sql("select null").write.format(format).mode("overwrite").save(tempDir) }.getMessage assert(msg.toLowerCase(Locale.ROOT) .contains(s"$format data source does not support null data type.")) - msg = intercept[UnsupportedOperationException] { + msg = intercept[AnalysisException] { spark.udf.register("testType", () => new NullData()) sql("select testType()").write.format(format).mode("overwrite").save(tempDir) }.getMessage @@ -353,13 +398,13 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo Seq("parquet", "csv").foreach { format => // write path - var msg = intercept[UnsupportedOperationException] { + var msg = intercept[AnalysisException] { sql("select null").write.format(format).mode("overwrite").save(tempDir) }.getMessage assert(msg.toLowerCase(Locale.ROOT) .contains(s"$format data source does not support null data type.")) - msg = intercept[UnsupportedOperationException] { + msg = intercept[AnalysisException] { spark.udf.register("testType", () => new NullData()) sql("select testType()").write.format(format).mode("overwrite").save(tempDir) }.getMessage @@ -367,7 +412,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo .contains(s"$format data source does not support null data type.")) // read path - msg = intercept[UnsupportedOperationException] { + msg = intercept[AnalysisException] { val schema = StructType(StructField("a", NullType, true) :: Nil) spark.range(1).write.format(format).mode("overwrite").save(tempDir) spark.read.schema(schema).format(format).load(tempDir).collect() @@ -375,7 +420,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo assert(msg.toLowerCase(Locale.ROOT) .contains(s"$format data source does not support null data type.")) - msg = intercept[UnsupportedOperationException] { + msg = intercept[AnalysisException] { val schema = StructType(StructField("a", new NullUDT(), true) :: Nil) spark.range(1).write.format(format).mode("overwrite").save(tempDir) spark.read.schema(schema).format(format).load(tempDir).collect() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 8764f0c42cf9f..bed564f6bc6f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, RawLocalFileSystem} import org.apache.hadoop.mapreduce.Job -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.BucketSpec @@ -36,7 +36,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.types.{DataType, IntegerType, StructType} import org.apache.spark.util.Utils class FileSourceStrategySuite extends QueryTest with SharedSQLContext with PredicateHelper { @@ -634,6 +634,8 @@ class TestFileFormat extends TextBasedFileFormat { (file: PartitionedFile) => { Iterator.empty } } + + override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = false } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 700e01f20ac6c..09bb1dbae6cae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -179,24 +179,22 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable } } - override def validateDataType(dataType: DataType, isReadPath: Boolean): Unit = dataType match { + override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | - StringType | BinaryType | DateType | TimestampType | _: DecimalType => + StringType | BinaryType | DateType | TimestampType | _: DecimalType => true - case st: StructType => st.foreach { f => validateDataType(f.dataType, isReadPath) } + case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) } - case ArrayType(elementType, _) => validateDataType(elementType, isReadPath) + case ArrayType(elementType, _) => supportDataType(elementType, isReadPath) case MapType(keyType, valueType, _) => - validateDataType(keyType, isReadPath) - validateDataType(valueType, isReadPath) + supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath) - case udt: UserDefinedType[_] => validateDataType(udt.sqlType, isReadPath) + case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath) - case _: NullType | _: CalendarIntervalType if isReadPath => + case _: NullType if isReadPath => true - case _ => throw new UnsupportedOperationException( - s"$this data source does not support ${dataType.simpleString} data type.") + case _ => false } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala index f467d820776ed..9a179ac32b984 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala @@ -146,16 +146,16 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { }.getMessage assert(msg.contains("Cannot save interval data type into external storage.")) - msg = intercept[UnsupportedOperationException] { + msg = intercept[AnalysisException] { sql("select null").write.mode("overwrite").orc(orcDir) }.getMessage assert(msg.contains("ORC data source does not support null data type.")) - msg = intercept[UnsupportedOperationException] { + msg = intercept[AnalysisException] { spark.udf.register("testType", () => new IntervalData()) sql("select testType()").write.mode("overwrite").orc(orcDir) }.getMessage - assert(msg.contains("ORC data source does not support calendarinterval data type.")) + assert(msg.contains("ORC data source does not support interval data type.")) } } } From cf4147f9ec9a231b77a503a880676dbd4daa75d4 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 5 Jul 2018 15:29:28 +0800 Subject: [PATCH 04/10] address comments --- .../spark/sql/execution/datasources/csv/CSVFileFormat.scala | 3 +-- .../sql/execution/datasources/json/JsonFileFormat.scala | 5 +++-- .../spark/sql/execution/datasources/orc/OrcFileFormat.scala | 5 ++--- .../execution/datasources/parquet/ParquetFileFormat.scala | 3 +-- .../sql/execution/datasources/text/TextFileFormat.scala | 6 ++---- .../sql/execution/datasources/FileSourceStrategySuite.scala | 4 +--- .../scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala | 5 ++--- 7 files changed, 12 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 04152fdfab75d..aeb40e5a4131d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -153,8 +153,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat] override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { - case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | - StringType | BinaryType | DateType | TimestampType | _: DecimalType => true + case _: AtomicType => true case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index cb51e341bf047..a9241afba537b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -146,8 +146,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat] override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { - case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | - StringType | BinaryType | DateType | TimestampType | _: DecimalType => true + case _: AtomicType => true case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) } @@ -156,6 +155,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { case MapType(keyType, valueType, _) => supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath) + case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath) + case _: NullType => true case _ => false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index f8a84395ec257..3a8c0add8c2f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -226,8 +226,7 @@ class OrcFileFormat } override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { - case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | - StringType | BinaryType | DateType | TimestampType | _: DecimalType => true + case _: AtomicType => true case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) } @@ -238,7 +237,7 @@ class OrcFileFormat case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath) - case _: NullType if isReadPath => true + case _: NullType => isReadPath case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 53634d76240fe..b86b97ec7b103 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -452,8 +452,7 @@ class ParquetFileFormat } override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { - case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | - StringType | BinaryType | DateType | TimestampType | _: DecimalType => true + case _: AtomicType => true case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 289923aa28a09..8661a5395ac44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -137,10 +137,8 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { } } - override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { - case StringType => true - case _ => false - } + override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = + dataType == StringType } class TextOutputWriter( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index bed564f6bc6f7..bceaf1a9ec061 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{DataType, IntegerType, StructType} +import org.apache.spark.sql.types.{IntegerType, StructType} import org.apache.spark.util.Utils class FileSourceStrategySuite extends QueryTest with SharedSQLContext with PredicateHelper { @@ -634,8 +634,6 @@ class TestFileFormat extends TextBasedFileFormat { (file: PartitionedFile) => { Iterator.empty } } - - override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = false } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 09bb1dbae6cae..d765d8751fe4b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -180,8 +180,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable } override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { - case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | - StringType | BinaryType | DateType | TimestampType | _: DecimalType => true + case _: AtomicType => true case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) } @@ -192,7 +191,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath) - case _: NullType if isReadPath => true + case _: NullType => isReadPath case _ => false } From 55df12858790ae10b8511b14932aeeb4c760120e Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 6 Jul 2018 16:31:08 +0800 Subject: [PATCH 05/10] address one comment --- .../apache/spark/sql/execution/datasources/FileFormat.scala | 2 +- .../sql/execution/datasources/FileSourceStrategySuite.scala | 4 +++- .../org/apache/spark/sql/hive/execution/HiveFileFormat.scala | 5 ++++- .../org/apache/spark/sql/sources/SimpleTextRelation.scala | 2 ++ 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 2c162e23644ef..b89419b4bbb32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -156,7 +156,7 @@ trait FileFormat { * Returns whether this format supports the given [[DataType]] in read/write path. * By default all data types are supported. */ - def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true + def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index bceaf1a9ec061..d90dae25ea162 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.types.{DataType, IntegerType, StructType} import org.apache.spark.util.Utils class FileSourceStrategySuite extends QueryTest with SharedSQLContext with PredicateHelper { @@ -634,6 +634,8 @@ class TestFileFormat extends TextBasedFileFormat { (file: PartitionedFile) => { Iterator.empty } } + + override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index 4a7cd6901923b..8acb3c3c70348 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, Out import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil} import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.SerializableJobConf /** @@ -104,6 +104,9 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) } } } + + // TODO: properly implement the data type support here. + override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true } class HiveOutputWriter( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 60a4638f610b3..ed4f084811217 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -115,6 +115,8 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister { } } } + + override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true } class SimpleTextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext) From 8740ac0823a6ef515bd3960da3768e29305d8c33 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 6 Jul 2018 17:02:40 +0800 Subject: [PATCH 06/10] Revert "address one comment" This reverts commit 16beafa4f128d3d592bf1f08c8e9b29770ae5a8a. --- .../apache/spark/sql/execution/datasources/FileFormat.scala | 2 +- .../sql/execution/datasources/FileSourceStrategySuite.scala | 4 +--- .../org/apache/spark/sql/hive/execution/HiveFileFormat.scala | 5 +---- .../org/apache/spark/sql/sources/SimpleTextRelation.scala | 2 -- 4 files changed, 3 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index b89419b4bbb32..2c162e23644ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -156,7 +156,7 @@ trait FileFormat { * Returns whether this format supports the given [[DataType]] in read/write path. * By default all data types are supported. */ - def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean + def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index d90dae25ea162..bceaf1a9ec061 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{DataType, IntegerType, StructType} +import org.apache.spark.sql.types.{IntegerType, StructType} import org.apache.spark.util.Utils class FileSourceStrategySuite extends QueryTest with SharedSQLContext with PredicateHelper { @@ -634,8 +634,6 @@ class TestFileFormat extends TextBasedFileFormat { (file: PartitionedFile) => { Iterator.empty } } - - override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index 8acb3c3c70348..4a7cd6901923b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, Out import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil} import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableJobConf /** @@ -104,9 +104,6 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) } } } - - // TODO: properly implement the data type support here. - override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true } class HiveOutputWriter( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index ed4f084811217..60a4638f610b3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -115,8 +115,6 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister { } } } - - override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true } class SimpleTextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext) From d4b4d13201bd9fc6a2d763319233a1da2100129e Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 11 Jul 2018 23:03:04 +0800 Subject: [PATCH 07/10] update the validate position of read path --- .../org/apache/spark/sql/execution/DataSourceScanExec.scala | 1 - .../org/apache/spark/sql/execution/datasources/DataSource.scala | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 29a37b27ae332..d7f2654be0451 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -306,7 +306,6 @@ case class FileSourceScanExec( } private lazy val inputRDD: RDD[InternalRow] = { - DataSourceUtils.verifyReadSchema(relation.fileFormat, relation.dataSchema) val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index f16d824201e77..0c3d9a4895fe2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -396,6 +396,7 @@ case class DataSource( hs.partitionSchema.map(_.name), "in the partition schema", equality) + DataSourceUtils.verifyReadSchema(hs.fileFormat, hs.dataSchema) case _ => SchemaUtils.checkColumnNameDuplication( relation.schema.map(_.name), From 757b82a248f019a7f722702b7fd65f87a6ef0d17 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 11 Jul 2018 23:44:41 +0800 Subject: [PATCH 08/10] remove duplicated check in hive orc --- .../scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index d765d8751fe4b..20090696ec3fc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -72,7 +72,6 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - DataSourceUtils.verifyWriteSchema(this, dataSchema) val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) @@ -123,7 +122,6 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - DataSourceUtils.verifyReadSchema(this, dataSchema) if (sparkSession.sessionState.conf.orcFilterPushDown) { // Sets pushed predicates From 9ed3a7d3f94aed00fb8087d4a6318c0eb4f8da7d Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 12 Jul 2018 00:27:00 +0800 Subject: [PATCH 09/10] add back some test cases --- .../spark/sql/hive/orc/HiveOrcSourceSuite.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala index 9a179ac32b984..fb4957ed943a7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala @@ -156,6 +156,21 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { sql("select testType()").write.mode("overwrite").orc(orcDir) }.getMessage assert(msg.contains("ORC data source does not support interval data type.")) + + // read path + msg = intercept[AnalysisException] { + val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) + spark.range(1).write.mode("overwrite").orc(orcDir) + spark.read.schema(schema).orc(orcDir).collect() + }.getMessage + assert(msg.contains("ORC data source does not support calendarinterval data type.")) + + msg = intercept[AnalysisException] { + val schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil) + spark.range(1).write.mode("overwrite").orc(orcDir) + spark.read.schema(schema).orc(orcDir).collect() + }.getMessage + assert(msg.contains("ORC data source does not support interval data type.")) } } } From 13de60ec3916b8396e6fe04e755eeb7d70c54b3a Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 12 Jul 2018 11:48:25 +0800 Subject: [PATCH 10/10] fix DDSuite --- .../scala/org/apache/spark/sql/execution/command/DDLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 270ed7f80197c..ca95aad3976e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2513,7 +2513,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { test("alter datasource table add columns - text format not supported") { withTable("t1") { - sql("CREATE TABLE t1 (c1 int) USING text") + sql("CREATE TABLE t1 (c1 string) USING text") val e = intercept[AnalysisException] { sql("ALTER TABLE t1 ADD COLUMNS (c2 int)") }.getMessage