From 5bcb6c571264b71c6b3b0ee0e351b845fa74dc60 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 30 Jan 2019 23:24:35 +0800 Subject: [PATCH 1/2] follow up --- .../apache/spark/sql/avro/AvroFileFormat.scala | 2 ++ .../org/apache/spark/sql/avro/AvroSuite.scala | 2 +- .../sql/execution/datasources/DataSource.scala | 2 +- .../execution/datasources/DataSourceUtils.scala | 17 +---------------- .../datasources/FileFormatWriter.scala | 2 +- 5 files changed, 6 insertions(+), 19 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 7391665e42cee..4e66095f7f4a5 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -124,6 +124,8 @@ private[avro] class AvroFileFormat extends FileFormat override def shortName(): String = "avro" + override def toString(): String = "AVRO" + override def isSplitable( sparkSession: SparkSession, options: Map[String, String], diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index d803537641409..81a5cb7cd31bd 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -896,7 +896,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { sql("select testType()").write.format("avro").mode("overwrite").save(tempDir) }.getMessage assert(msg.toLowerCase(Locale.ROOT) - .contains(s"data source does not support calendarinterval data type.")) + .contains(s"avro data source does not support calendarinterval data type.")) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d48261e783dc9..db81fbd09853b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -412,7 +412,7 @@ case class DataSource( hs.partitionSchema.map(_.name), "in the partition schema", equality) - DataSourceUtils.verifyReadSchema(hs.fileFormat, hs.dataSchema) + DataSourceUtils.verifySchema(hs.fileFormat, hs.dataSchema) case _ => SchemaUtils.checkColumnNameDuplication( relation.schema.map(_.name), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index a32a9405676f5..74eae94e65b00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -24,26 +24,11 @@ import org.apache.spark.sql.types._ object DataSourceUtils { - - /** - * Verify if the schema is supported in datasource in write path. - */ - def verifyWriteSchema(format: FileFormat, schema: StructType): Unit = { - verifySchema(format, schema, isReadPath = false) - } - - /** - * Verify if the schema is supported in datasource in read path. - */ - def verifyReadSchema(format: FileFormat, schema: StructType): Unit = { - verifySchema(format, schema, isReadPath = true) - } - /** * Verify if the schema is supported in datasource. This verification should be done * in a driver side. */ - private def verifySchema(format: FileFormat, schema: StructType, isReadPath: Boolean): Unit = { + def verifySchema(format: FileFormat, schema: StructType): Unit = { schema.foreach { field => if (!format.supportDataType(field.dataType)) { throw new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 91e92d884ed6c..2232da4ce8427 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -98,7 +98,7 @@ object FileFormatWriter extends Logging { val caseInsensitiveOptions = CaseInsensitiveMap(options) val dataSchema = dataColumns.toStructType - DataSourceUtils.verifyWriteSchema(fileFormat, dataSchema) + DataSourceUtils.verifySchema(fileFormat, dataSchema) // Note: prepareWrite has side effect. It sets "job". val outputWriterFactory = fileFormat.prepareWrite(sparkSession, job, caseInsensitiveOptions, dataSchema) From 30a318b100f43ccfc30e2ef32c3b09c0e7096aaf Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 31 Jan 2019 11:15:58 +0800 Subject: [PATCH 2/2] AVRO -> Avro --- .../main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 4e66095f7f4a5..c2a7f31759439 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -124,7 +124,7 @@ private[avro] class AvroFileFormat extends FileFormat override def shortName(): String = "avro" - override def toString(): String = "AVRO" + override def toString(): String = "Avro" override def isSplitable( sparkSession: SparkSession,