From d83b01e3dd557028330386912807c665394770c2 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 1 Feb 2019 01:09:05 +0800 Subject: [PATCH 1/5] validate schema in data source v2 --- .../org/apache/spark/sql/avro/AvroSuite.scala | 5 +- .../execution/datasources/DataSource.scala | 7 - .../execution/datasources/v2/FileScan.scala | 23 ++- .../datasources/v2/FileWriteBuilder.scala | 14 +- .../datasources/v2/orc/OrcDataSourceV2.scala | 19 ++- .../datasources/v2/orc/OrcScan.scala | 10 +- .../datasources/v2/orc/OrcWriteBuilder.scala | 6 + .../spark/sql/FileBasedDataSourceSuite.scala | 154 ++++++++++-------- .../org/apache/spark/sql/SQLQuerySuite.scala | 2 +- .../sql/hive/orc/HiveOrcSourceSuite.scala | 2 +- 10 files changed, 154 insertions(+), 88 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 81a5cb7cd31bd..41ba9a135c6fe 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -889,14 +889,15 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { var msg = intercept[AnalysisException] { sql("select interval 1 days").write.format("avro").mode("overwrite").save(tempDir) }.getMessage - assert(msg.contains("Cannot save interval data type into external storage.")) + assert(msg.toLowerCase(Locale.ROOT) + .contains("avro data source does not support calendarinterval data type.")) msg = intercept[AnalysisException] { spark.udf.register("testType", () => new IntervalData()) sql("select testType()").write.format("avro").mode("overwrite").save(tempDir) }.getMessage assert(msg.toLowerCase(Locale.ROOT) - .contains(s"avro data source does not support calendarinterval data type.")) + .contains("avro data source does not support calendarinterval data type.")) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 10dae8a55b47a..af6bd3a1d46cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -490,9 +490,6 @@ case class DataSource( outputColumnNames: Seq[String], physicalPlan: SparkPlan): BaseRelation = { val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, outputColumnNames) - if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { - throw new AnalysisException("Cannot save interval data type into external storage.") - } providingClass.getConstructor().newInstance() match { case dataSource: CreatableRelationProvider => @@ -527,10 +524,6 @@ case class DataSource( * Returns a logical plan to write the given [[LogicalPlan]] out to this [[DataSource]]. */ def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = { - if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { - throw new AnalysisException("Cannot save interval data type into external storage.") - } - providingClass.getConstructor().newInstance() match { case dataSource: CreatableRelationProvider => SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 3615b15be6fd5..94a3213ac9f97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -18,15 +18,16 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.hadoop.fs.Path -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, Scan} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} abstract class FileScan( sparkSession: SparkSession, - fileIndex: PartitioningAwareFileIndex) extends Scan with Batch { + fileIndex: PartitioningAwareFileIndex, + readSchema: StructType) extends Scan with Batch { /** * Returns whether a file with `path` could be split or not. */ @@ -34,6 +35,12 @@ abstract class FileScan( false } + /** + * Returns whether this format supports the given [[DataType]] in write path. + * By default all data types are supported. + */ + def supportDataType(dataType: DataType): Boolean = true + protected def partitions: Seq[FilePartition] = { val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty) val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) @@ -57,5 +64,13 @@ abstract class FileScan( partitions.toArray } - override def toBatch: Batch = this + override def toBatch: Batch = { + readSchema.foreach { field => + if (!supportDataType(field.dataType)) { + throw new AnalysisException( + s"$this data source does not support ${field.dataType.catalogString} data type.") + } + } + this + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala index ce9b52f29d7bd..15e13fc8bbe93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.writer.{BatchWrite, SupportsSaveMode, WriteBuilder} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.SerializableConfiguration abstract class FileWriteBuilder(options: DataSourceOptions) @@ -104,12 +104,24 @@ abstract class FileWriteBuilder(options: DataSourceOptions) options: Map[String, String], dataSchema: StructType): OutputWriterFactory + /** + * Returns whether this format supports the given [[DataType]] in write path. + * By default all data types are supported. + */ + def supportDataType(dataType: DataType): Boolean = true + private def validateInputs(): Unit = { assert(schema != null, "Missing input data schema") assert(queryId != null, "Missing query ID") assert(mode != null, "Missing save mode") assert(options.paths().length == 1) DataSource.validateSchema(schema) + schema.foreach { field => + if (!supportDataType(field.dataType)) { + throw new AnalysisException( + s"$this data source does not support ${field.dataType.catalogString} data type.") + } + } } private def getJobInstance(hadoopConf: Configuration, path: Path): Job = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala index db1f2f7934221..2f30ef73d3556 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.sources.v2.{DataSourceOptions, Table} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types._ class OrcDataSourceV2 extends FileDataSourceV2 { @@ -44,3 +44,20 @@ class OrcDataSourceV2 extends FileDataSourceV2 { OrcTable(tableName, sparkSession, fileIndex, Some(schema)) } } + +object OrcDataSourceV2 { + def supportDataType(dataType: DataType): Boolean = dataType match { + case _: AtomicType => true + + case st: StructType => st.forall { f => supportDataType(f.dataType) } + + case ArrayType(elementType, _) => supportDataType(elementType) + + case MapType(keyType, valueType, _) => + supportDataType(keyType) && supportDataType(valueType) + + case udt: UserDefinedType[_] => supportDataType(udt.sqlType) + + case _ => false + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala index a792ad318b398..0ce5c49bec44e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.v2.FileScan import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.SerializableConfiguration case class OrcScan( @@ -31,7 +31,7 @@ case class OrcScan( hadoopConf: Configuration, fileIndex: PartitioningAwareFileIndex, dataSchema: StructType, - readSchema: StructType) extends FileScan(sparkSession, fileIndex) { + readSchema: StructType) extends FileScan(sparkSession, fileIndex, readSchema) { override def isSplitable(path: Path): Boolean = true override def createReaderFactory(): PartitionReaderFactory = { @@ -40,4 +40,10 @@ case class OrcScan( OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, dataSchema, fileIndex.partitionSchema, readSchema) } + + override def supportDataType(dataType: DataType): Boolean = { + OrcDataSourceV2.supportDataType(dataType) + } + + override def toString: String = "ORC" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala index 80429d91d5e4d..4d84122e87db1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala @@ -63,4 +63,10 @@ class OrcWriteBuilder(options: DataSourceOptions) extends FileWriteBuilder(optio } } } + + override def supportDataType(dataType: DataType): Boolean = { + OrcDataSourceV2.supportDataType(dataType) + } + + override def toString: String = "ORC" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index fc87b0462bc25..aab67b573eff8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -329,83 +329,99 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo test("SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc") { withTempDir { dir => val tempDir = new File(dir, "files").getCanonicalPath - // TODO(SPARK-26744): support data type validating in V2 data source, and test V2 as well. - withSQLConf(SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> "orc") { - // write path - Seq("csv", "json", "parquet", "orc").foreach { format => - var msg = intercept[AnalysisException] { - sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir) - }.getMessage - assert(msg.contains("Cannot save interval data type into external storage.")) - - msg = intercept[AnalysisException] { - spark.udf.register("testType", () => new IntervalData()) - sql("select testType()").write.format(format).mode("overwrite").save(tempDir) - }.getMessage - assert(msg.toLowerCase(Locale.ROOT) - .contains(s"$format data source does not support calendarinterval data type.")) + Seq(true, false).foreach { useV1 => + val useV1List = if (useV1) { + "orc" + } else { + "" } + def errorMessage(format: String): String = { + s"$format data source does not support calendarinterval data type." + } + + withSQLConf(SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> useV1List) { + // write path + Seq("csv", "json", "parquet", "orc").foreach { format => + var msg = intercept[AnalysisException] { + sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir) + }.getMessage + assert(msg.toLowerCase(Locale.ROOT).contains(errorMessage(format))) + + msg = intercept[AnalysisException] { + spark.udf.register("testType", () => new IntervalData()) + sql("select testType()").write.format(format).mode("overwrite").save(tempDir) + }.getMessage + assert(msg.toLowerCase(Locale.ROOT).contains(errorMessage(format))) + } - // read path - Seq("parquet", "csv").foreach { format => - var msg = intercept[AnalysisException] { - val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) - spark.range(1).write.format(format).mode("overwrite").save(tempDir) - spark.read.schema(schema).format(format).load(tempDir).collect() - }.getMessage - assert(msg.toLowerCase(Locale.ROOT) - .contains(s"$format data source does not support calendarinterval data type.")) - - msg = intercept[AnalysisException] { - val schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil) - spark.range(1).write.format(format).mode("overwrite").save(tempDir) - spark.read.schema(schema).format(format).load(tempDir).collect() - }.getMessage - assert(msg.toLowerCase(Locale.ROOT) - .contains(s"$format data source does not support calendarinterval data type.")) + // read path + Seq("parquet", "csv").foreach { format => + var msg = intercept[AnalysisException] { + val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) + spark.range(1).write.format(format).mode("overwrite").save(tempDir) + spark.read.schema(schema).format(format).load(tempDir).collect() + }.getMessage + assert(msg.toLowerCase(Locale.ROOT).contains(errorMessage(format))) + + msg = intercept[AnalysisException] { + val schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil) + spark.range(1).write.format(format).mode("overwrite").save(tempDir) + spark.read.schema(schema).format(format).load(tempDir).collect() + }.getMessage + assert(msg.toLowerCase(Locale.ROOT).contains(errorMessage(format))) + } } } } } test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") { - // TODO(SPARK-26744): support data type validating in V2 data source, and test V2 as well. - withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "orc", - SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> "orc") { - withTempDir { dir => - val tempDir = new File(dir, "files").getCanonicalPath - - Seq("parquet", "csv", "orc").foreach { format => - // write path - var msg = intercept[AnalysisException] { - sql("select null").write.format(format).mode("overwrite").save(tempDir) - }.getMessage - assert(msg.toLowerCase(Locale.ROOT) - .contains(s"$format data source does not support null data type.")) - - msg = intercept[AnalysisException] { - spark.udf.register("testType", () => new NullData()) - sql("select testType()").write.format(format).mode("overwrite").save(tempDir) - }.getMessage - assert(msg.toLowerCase(Locale.ROOT) - .contains(s"$format data source does not support null data type.")) - - // read path - msg = intercept[AnalysisException] { - val schema = StructType(StructField("a", NullType, true) :: Nil) - spark.range(1).write.format(format).mode("overwrite").save(tempDir) - spark.read.schema(schema).format(format).load(tempDir).collect() - }.getMessage - assert(msg.toLowerCase(Locale.ROOT) - .contains(s"$format data source does not support null data type.")) - - msg = intercept[AnalysisException] { - val schema = StructType(StructField("a", new NullUDT(), true) :: Nil) - spark.range(1).write.format(format).mode("overwrite").save(tempDir) - spark.read.schema(schema).format(format).load(tempDir).collect() - }.getMessage - assert(msg.toLowerCase(Locale.ROOT) - .contains(s"$format data source does not support null data type.")) + Seq(true, false).foreach { useV1 => + val useV1List = if (useV1) { + "orc" + } else { + "" + } + def errorMessage(format: String): String = { + s"$format data source does not support null data type." + } + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1List, + SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> useV1List) { + withTempDir { dir => + val tempDir = new File(dir, "files").getCanonicalPath + + Seq("parquet", "csv", "orc").foreach { format => + // write path + var msg = intercept[AnalysisException] { + sql("select null").write.format(format).mode("overwrite").save(tempDir) + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(errorMessage(format))) + + msg = intercept[AnalysisException] { + spark.udf.register("testType", () => new NullData()) + sql("select testType()").write.format(format).mode("overwrite").save(tempDir) + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(errorMessage(format))) + + // read path + msg = intercept[AnalysisException] { + val schema = StructType(StructField("a", NullType, true) :: Nil) + spark.range(1).write.format(format).mode("overwrite").save(tempDir) + spark.read.schema(schema).format(format).load(tempDir).collect() + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(errorMessage(format))) + + msg = intercept[AnalysisException] { + val schema = StructType(StructField("a", new NullUDT(), true) :: Nil) + spark.range(1).write.format(format).mode("overwrite").save(tempDir) + spark.read.schema(schema).format(format).load(tempDir).collect() + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(errorMessage(format))) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index b8c4d73f1b2b4..6f99977cfe2ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1477,7 +1477,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { val e = intercept[AnalysisException] { df.write.json(f.getCanonicalPath) } - e.message.contains("Cannot save interval data type into external storage") + e.message.contains("JSON data source does not support calendarinterval data type.") }) val e1 = intercept[AnalysisException] { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala index c46512b6f5852..a1758b67433a6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala @@ -116,7 +116,7 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { var msg = intercept[AnalysisException] { sql("select interval 1 days").write.mode("overwrite").orc(orcDir) }.getMessage - assert(msg.contains("Cannot save interval data type into external storage.")) + assert(msg.contains("ORC data source does not support calendarinterval data type.")) msg = intercept[AnalysisException] { sql("select null").write.mode("overwrite").orc(orcDir) From d8240b3448a8a3392125e0bb389a3d7819faf9a8 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 14 Feb 2019 21:57:23 +0800 Subject: [PATCH 2/5] address comments --- .../execution/datasources/DataSource.scala | 3 +++ .../execution/datasources/v2/FileScan.scala | 16 ++++++++++++--- .../datasources/v2/FileWriteBuilder.scala | 16 ++++++++++++--- .../datasources/v2/orc/OrcDataSourceV2.scala | 10 +++++----- .../datasources/v2/orc/OrcScan.scala | 6 +++--- .../datasources/v2/orc/OrcWriteBuilder.scala | 6 +++--- .../spark/sql/FileBasedDataSourceSuite.scala | 20 +++++++++---------- 7 files changed, 49 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index af6bd3a1d46cc..2dbd7648fbe71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -524,6 +524,9 @@ case class DataSource( * Returns a logical plan to write the given [[LogicalPlan]] out to this [[DataSource]]. */ def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = { + if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { + throw new AnalysisException("Cannot save interval data type into external storage.") + } providingClass.getConstructor().newInstance() match { case dataSource: CreatableRelationProvider => SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 94a3213ac9f97..bdd6a48df20ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -39,7 +39,17 @@ abstract class FileScan( * Returns whether this format supports the given [[DataType]] in write path. * By default all data types are supported. */ - def supportDataType(dataType: DataType): Boolean = true + def supportsDataType(dataType: DataType): Boolean = true + + /** + * The string that represents the format that this data source provider uses. This is + * overridden by children to provide a nice alias for the data source. For example: + * + * {{{ + * override def formatName(): String = "ORC" + * }}} + */ + def formatName: String protected def partitions: Seq[FilePartition] = { val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty) @@ -66,9 +76,9 @@ abstract class FileScan( override def toBatch: Batch = { readSchema.foreach { field => - if (!supportDataType(field.dataType)) { + if (!supportsDataType(field.dataType)) { throw new AnalysisException( - s"$this data source does not support ${field.dataType.catalogString} data type.") + s"$formatName data source does not support ${field.dataType.catalogString} data type.") } } this diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala index 15e13fc8bbe93..6a94248a6f0f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala @@ -108,7 +108,17 @@ abstract class FileWriteBuilder(options: DataSourceOptions) * Returns whether this format supports the given [[DataType]] in write path. * By default all data types are supported. */ - def supportDataType(dataType: DataType): Boolean = true + def supportsDataType(dataType: DataType): Boolean = true + + /** + * The string that represents the format that this data source provider uses. This is + * overridden by children to provide a nice alias for the data source. For example: + * + * {{{ + * override def formatName(): String = "ORC" + * }}} + */ + def formatName: String private def validateInputs(): Unit = { assert(schema != null, "Missing input data schema") @@ -117,9 +127,9 @@ abstract class FileWriteBuilder(options: DataSourceOptions) assert(options.paths().length == 1) DataSource.validateSchema(schema) schema.foreach { field => - if (!supportDataType(field.dataType)) { + if (!supportsDataType(field.dataType)) { throw new AnalysisException( - s"$this data source does not support ${field.dataType.catalogString} data type.") + s"$formatName data source does not support ${field.dataType.catalogString} data type.") } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala index 2f30ef73d3556..813e3f7f25eef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala @@ -46,17 +46,17 @@ class OrcDataSourceV2 extends FileDataSourceV2 { } object OrcDataSourceV2 { - def supportDataType(dataType: DataType): Boolean = dataType match { + def supportsDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true - case st: StructType => st.forall { f => supportDataType(f.dataType) } + case st: StructType => st.forall { f => supportsDataType(f.dataType) } - case ArrayType(elementType, _) => supportDataType(elementType) + case ArrayType(elementType, _) => supportsDataType(elementType) case MapType(keyType, valueType, _) => - supportDataType(keyType) && supportDataType(valueType) + supportsDataType(keyType) && supportsDataType(valueType) - case udt: UserDefinedType[_] => supportDataType(udt.sqlType) + case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala index 0ce5c49bec44e..3c5dc1f50d7e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -41,9 +41,9 @@ case class OrcScan( dataSchema, fileIndex.partitionSchema, readSchema) } - override def supportDataType(dataType: DataType): Boolean = { - OrcDataSourceV2.supportDataType(dataType) + override def supportsDataType(dataType: DataType): Boolean = { + OrcDataSourceV2.supportsDataType(dataType) } - override def toString: String = "ORC" + override def formatName: String = "ORC" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala index 4d84122e87db1..1aec4d872a64d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala @@ -64,9 +64,9 @@ class OrcWriteBuilder(options: DataSourceOptions) extends FileWriteBuilder(optio } } - override def supportDataType(dataType: DataType): Boolean = { - OrcDataSourceV2.supportDataType(dataType) + override def supportsDataType(dataType: DataType): Boolean = { + OrcDataSourceV2.supportsDataType(dataType) } - override def toString: String = "ORC" + override def formatName: String = "ORC" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index aab67b573eff8..e0c0484593d99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -335,8 +335,12 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } else { "" } - def errorMessage(format: String): String = { - s"$format data source does not support calendarinterval data type." + def errorMessage(format: String, isWrite: Boolean): String = { + if (isWrite && (useV1 || format != "orc")) { + "cannot save interval data type into external storage." + } else { + s"$format data source does not support calendarinterval data type." + } } withSQLConf(SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> useV1List) { @@ -345,13 +349,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo var msg = intercept[AnalysisException] { sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir) }.getMessage - assert(msg.toLowerCase(Locale.ROOT).contains(errorMessage(format))) - - msg = intercept[AnalysisException] { - spark.udf.register("testType", () => new IntervalData()) - sql("select testType()").write.format(format).mode("overwrite").save(tempDir) - }.getMessage - assert(msg.toLowerCase(Locale.ROOT).contains(errorMessage(format))) + assert(msg.toLowerCase(Locale.ROOT).contains(errorMessage(format, true))) } // read path @@ -361,14 +359,14 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo spark.range(1).write.format(format).mode("overwrite").save(tempDir) spark.read.schema(schema).format(format).load(tempDir).collect() }.getMessage - assert(msg.toLowerCase(Locale.ROOT).contains(errorMessage(format))) + assert(msg.toLowerCase(Locale.ROOT).contains(errorMessage(format, false))) msg = intercept[AnalysisException] { val schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil) spark.range(1).write.format(format).mode("overwrite").save(tempDir) spark.read.schema(schema).format(format).load(tempDir).collect() }.getMessage - assert(msg.toLowerCase(Locale.ROOT).contains(errorMessage(format))) + assert(msg.toLowerCase(Locale.ROOT).contains(errorMessage(format, false))) } } } From 5b7b25889cafdb0ef0cb65926d3963a1541ae75f Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 14 Feb 2019 22:08:43 +0800 Subject: [PATCH 3/5] fix avrosuite --- .../src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 41ba9a135c6fe..81a5cb7cd31bd 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -889,15 +889,14 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { var msg = intercept[AnalysisException] { sql("select interval 1 days").write.format("avro").mode("overwrite").save(tempDir) }.getMessage - assert(msg.toLowerCase(Locale.ROOT) - .contains("avro data source does not support calendarinterval data type.")) + assert(msg.contains("Cannot save interval data type into external storage.")) msg = intercept[AnalysisException] { spark.udf.register("testType", () => new IntervalData()) sql("select testType()").write.format("avro").mode("overwrite").save(tempDir) }.getMessage assert(msg.toLowerCase(Locale.ROOT) - .contains("avro data source does not support calendarinterval data type.")) + .contains(s"avro data source does not support calendarinterval data type.")) } } From 5f3ec83b4d7338f37bd51345edd288011856ab98 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 14 Feb 2019 22:11:46 +0800 Subject: [PATCH 4/5] fix --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 2 +- .../org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6f99977cfe2ce..b8c4d73f1b2b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1477,7 +1477,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { val e = intercept[AnalysisException] { df.write.json(f.getCanonicalPath) } - e.message.contains("JSON data source does not support calendarinterval data type.") + e.message.contains("Cannot save interval data type into external storage") }) val e1 = intercept[AnalysisException] { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala index a1758b67433a6..c46512b6f5852 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala @@ -116,7 +116,7 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { var msg = intercept[AnalysisException] { sql("select interval 1 days").write.mode("overwrite").orc(orcDir) }.getMessage - assert(msg.contains("ORC data source does not support calendarinterval data type.")) + assert(msg.contains("Cannot save interval data type into external storage.")) msg = intercept[AnalysisException] { sql("select null").write.mode("overwrite").orc(orcDir) From ee60027a923e1697f98a1f6b61459970489f9af3 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 15 Feb 2019 11:12:53 +0800 Subject: [PATCH 5/5] remove unnecessary change --- .../apache/spark/sql/execution/datasources/DataSource.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 2dbd7648fbe71..10dae8a55b47a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -490,6 +490,9 @@ case class DataSource( outputColumnNames: Seq[String], physicalPlan: SparkPlan): BaseRelation = { val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, outputColumnNames) + if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { + throw new AnalysisException("Cannot save interval data type into external storage.") + } providingClass.getConstructor().newInstance() match { case dataSource: CreatableRelationProvider => @@ -527,6 +530,7 @@ case class DataSource( if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { throw new AnalysisException("Cannot save interval data type into external storage.") } + providingClass.getConstructor().newInstance() match { case dataSource: CreatableRelationProvider => SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode)