From eb697e14eaeb17a93bc0e49f32e39337b7d40532 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 3 Nov 2016 18:47:46 +0900 Subject: [PATCH 01/11] Throws an exception before execution for unsupported types in Json, CSV and text functionailities --- .../expressions/jsonExpressions.scala | 15 +++++++ .../apache/spark/sql/DataFrameReader.scala | 8 +++- .../datasources/csv/CSVFileFormat.scala | 17 ++++++-- .../datasources/json/JsonFileFormat.scala | 12 +++++- .../datasources/text/TextFileFormat.scala | 11 +++-- .../apache/spark/sql/JsonFunctionsSuite.scala | 12 ++++++ .../execution/datasources/csv/CSVSuite.scala | 29 ++++++++----- .../datasources/json/JsonSuite.scala | 31 ++++++++++++++ .../datasources/text/TextSuite.scala | 42 ++++++++++++++----- 9 files changed, 144 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index b61583d0dafb6..79493265ec7c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -496,6 +496,21 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child: override def dataType: DataType = schema + override def checkInputDataTypes(): TypeCheckResult = { + if (StringType.acceptsType(child.dataType)) { + try { + JacksonUtils.verifySchema(schema) + TypeCheckResult.TypeCheckSuccess + } catch { + case e: UnsupportedOperationException => + TypeCheckResult.TypeCheckFailure(e.getMessage) + } + } else { + TypeCheckResult.TypeCheckFailure( + s"$prettyName requires that the expression is a string expression.") + } + } + override def nullSafeEval(json: Any): Any = { try parser.parse(json.toString).head catch { case _: SparkSQLJsonProcessingException => null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index a77937efd7e15..5594956158535 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.Partition import org.apache.spark.annotation.InterfaceStability import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.json.{JacksonParser, JacksonUtils, JSONOptions} import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.jdbc._ @@ -329,6 +329,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { columnNameOfCorruptRecord, parsedOptions) } + if (parsedOptions.failFast) { + // We can fail before starting to parse in cast of "FAILFAST" mode. In case of "PERMISIVE" + // mode, allows to read values as null for unsupported types. In case of "DROPMALFORMED" + // mode, drops records only containing non-null values in unsupported types. + JacksonUtils.verifySchema(schema) + } val parsed = jsonRDD.mapPartitions { iter => val parser = new JacksonParser(schema, columnNameOfCorruptRecord, parsedOptions) iter.flatMap(parser.parse) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index a3691158ee758..1dbc66315d0a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -143,6 +143,15 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + if (csvOptions.failFast) { + // We can fail before starting to parse in cast of "FAILFAST" mode. In case of "PERMISIVE" + // mode, allows to read values as null for unsupported types. In case of "DROPMALFORMED" + // mode, drops records only containing non-null values in unsupported types. We should use + // `requiredSchema` instead of whole schema `dataSchema` here to not to break the original + // behaviour. + verifySchema(requiredSchema) + } + (file: PartitionedFile) => { val lineIterator = { val conf = broadcastedHadoopConf.value.value @@ -222,18 +231,18 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { } private def verifySchema(schema: StructType): Unit = { - def verifyType(dataType: DataType): Unit = dataType match { + def verifyType(name: String, dataType: DataType): Unit = dataType match { case ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | BooleanType | _: DecimalType | TimestampType | DateType | StringType => - case udt: UserDefinedType[_] => verifyType(udt.sqlType) + case udt: UserDefinedType[_] => verifyType(name, udt.sqlType) case _ => throw new UnsupportedOperationException( - s"CSV data source does not support ${dataType.simpleString} data type.") + s"Unable to convert column $name of type ${dataType.simpleString} to CSV.") } - schema.foreach(field => verifyType(field.dataType)) + schema.foreach(field => verifyType(field.name, field.dataType)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 0e38aefecb673..95d53d08bca33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JacksonUtils, JSONOptions} import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextOutputWriter @@ -75,6 +75,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { + JacksonUtils.verifySchema(dataSchema) val conf = job.getConfiguration val parsedOptions: JSONOptions = new JSONOptions(options) parsedOptions.compressionCodec.foreach { codec => @@ -110,6 +111,15 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) + if (parsedOptions.failFast) { + // We can fail before starting to parse in cast of "FAILFAST" mode. In case of "PERMISIVE" + // mode, allows to read values as null for unsupported types. In case of "DROPMALFORMED" + // mode, drops records only containing non-null values in unsupported types. We should use + // `requiredSchema` instead of whole schema `dataSchema` here to not to break the original + // behaviour. + JacksonUtils.verifySchema(requiredSchema) + } + (file: PartitionedFile) => { val linesReader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 8e043960326df..77ba60c6e9e0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -44,13 +44,14 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { override def shortName(): String = "text" private def verifySchema(schema: StructType): Unit = { - if (schema.size != 1) { - throw new AnalysisException( + if (schema.size > 1) { + throw new UnsupportedOperationException( s"Text data source supports only a single column, and you have ${schema.size} columns.") } + val tpe = schema(0).dataType if (tpe != StringType) { - throw new AnalysisException( + throw new UnsupportedOperationException( s"Text data source supports only a string column, but you have ${tpe.simpleString}.") } } @@ -95,9 +96,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { - assert( - requiredSchema.length <= 1, - "Text data source only produces a single data column named \"value\".") + verifySchema(dataSchema) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 7d63d31d9b979..e7731a39a0a61 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -123,6 +123,18 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Row(null) :: Nil) } + test("from_json unsupported type") { + val df = Seq("""{"a" 1}""").toDS() + val schema = new StructType().add("a", CalendarIntervalType) + + val e = intercept[AnalysisException]{ + // Unsupported type throws an exception + df.select(from_json($"value", schema)).collect() + } + assert(e.getMessage.contains( + "Unable to convert column a of type calendarinterval to JSON.")) + } + test("to_json") { val df = Seq(Tuple1(Tuple1(1))).toDF("a") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 491ff72337a81..ea60fac4a3548 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -669,32 +669,41 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { withTempDir { dir => val csvDir = new File(dir, "csv").getCanonicalPath var msg = intercept[UnsupportedOperationException] { - Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, b)").write.csv(csvDir) + Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, b) as a").write.csv(csvDir) }.getMessage - assert(msg.contains("CSV data source does not support struct data type")) + assert(msg.contains("Unable to convert column a of type struct to CSV")) msg = intercept[UnsupportedOperationException] { Seq((1, Map("Tesla" -> 3))).toDF("id", "cars").write.csv(csvDir) }.getMessage - assert(msg.contains("CSV data source does not support map data type")) + assert(msg.contains("Unable to convert column cars of type map to CSV")) msg = intercept[UnsupportedOperationException] { Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands").write.csv(csvDir) }.getMessage - assert(msg.contains("CSV data source does not support array data type")) + assert(msg.contains("Unable to convert column brands of type array to CSV")) msg = intercept[UnsupportedOperationException] { Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25)))).toDF("id", "vectors") .write.csv(csvDir) }.getMessage - assert(msg.contains("CSV data source does not support array data type")) + assert(msg.contains("Unable to convert column vectors of type array to CSV")) - msg = intercept[SparkException] { + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), true) :: Nil) + val path = s"$csvDir/tmp" + spark.range(1).write.csv(path) + spark.read.schema(schema).option("mode", "FAILFAST").csv(path).collect() + }.getMessage + assert(msg.contains("Unable to convert column a of type array to CSV")) + + msg = intercept[SparkException] { val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), true) :: Nil) - spark.range(1).write.csv(csvDir) - spark.read.schema(schema).csv(csvDir).collect() - }.getCause.getMessage - assert(msg.contains("Unsupported type: array")) + val path = s"$csvDir/tmp1" + spark.range(1).write.csv(path) + spark.read.schema(schema).csv(path).show() + }.getCause.getMessage + assert(msg.contains("Unsupported type: array")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 456052f79afcc..a44c7148c9f50 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1055,6 +1055,37 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(exceptionTwo.getMessage.contains("Malformed line in FAILFAST mode: {")) } + test("Unsupported types: FAILFAST mode") { + val schema = StructType( + StructField("a", CalendarIntervalType, true) :: Nil) + val exceptionOne = intercept[UnsupportedOperationException] { + // Read JSON data from RDD + spark.read + .option("mode", "FAILFAST") + .schema(schema) + .json(corruptRecords) + .collect() + } + + assert(exceptionOne.getMessage.contains( + "Unable to convert column a of type calendarinterval to JSON.")) + + val exceptionTwo = intercept[UnsupportedOperationException] { + // Read JSON data from files. + withTempDir { path => + spark.read + .option("mode", "FAILFAST") + .schema(schema) + .format("json") + .load(path.getAbsolutePath) + .collect() + } + } + + assert(exceptionTwo.getMessage.contains( + "Unable to convert column a of type calendarinterval to JSON.")) + } + test("Corrupt records: DROPMALFORMED mode") { val schemaOne = StructType( StructField("a", StringType, true) :: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index d11c2acb815d4..2d24532f9ac67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils class TextSuite extends QueryTest with SharedSQLContext { @@ -51,16 +51,18 @@ class TextSuite extends QueryTest with SharedSQLContext { } test("error handling for invalid schema") { - val tempFile = Utils.createTempDir() - tempFile.delete() - - val df = spark.range(2) - intercept[AnalysisException] { - df.write.text(tempFile.getCanonicalPath) - } - - intercept[AnalysisException] { - spark.range(2).select(df("id"), df("id") + 1).write.text(tempFile.getCanonicalPath) + withTempPath { path => + var message = intercept[UnsupportedOperationException] { + spark.range(2).write.text(path.getCanonicalPath) + }.getMessage + assert(message.contains( + "Text data source supports only a string column, but you have bigint.")) + + message = intercept[UnsupportedOperationException] { + spark.range(2).selectExpr("'a'", "'b'").write.text(path.getCanonicalPath) + }.getMessage + assert(message.contains( + "Text data source supports only a single column, and you have 2 columns.")) } } @@ -154,6 +156,24 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + test("error handling for unsupported data types.") { + withTempPath { path => + var msg = intercept[UnsupportedOperationException] { + Seq((1, "Tesla")) + .toDF("a", "b").selectExpr("struct(a, b) as a").write.text(path.getAbsolutePath) + }.getMessage + assert(msg.contains( + "Text data source supports only a string column, but you have struct.")) + + msg = intercept[UnsupportedOperationException] { + val schema = StructType(StructField("a", LongType, true) :: Nil) + spark.range(1).write.text(path.getAbsolutePath) + spark.read.schema(schema).text(path.getAbsolutePath).collect() + }.getMessage + assert(msg.contains("Text data source supports only a string column, but you have bigint.")) + } + } + private def testFile: String = { Thread.currentThread().getContextClassLoader.getResource("test-data/text-suite.txt").toString } From 8c79da74d69f06ad30d81a69fa88a52395c807a0 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 3 Nov 2016 18:58:38 +0900 Subject: [PATCH 02/11] Revert back the comparison change --- .../spark/sql/execution/datasources/text/TextFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 77ba60c6e9e0c..2683adbbb79d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -44,7 +44,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { override def shortName(): String = "text" private def verifySchema(schema: StructType): Unit = { - if (schema.size > 1) { + if (schema.size != 1) { throw new UnsupportedOperationException( s"Text data source supports only a single column, and you have ${schema.size} columns.") } From 144cc3b5ea5ac1c83967e3541bda9f735b5a86e2 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 3 Nov 2016 19:06:03 +0900 Subject: [PATCH 03/11] Fix comments --- .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 4 ++-- .../spark/sql/execution/datasources/csv/CSVFileFormat.scala | 4 ++-- .../spark/sql/execution/datasources/json/JsonFileFormat.scala | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 5594956158535..742822e7557ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -331,8 +331,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } if (parsedOptions.failFast) { // We can fail before starting to parse in cast of "FAILFAST" mode. In case of "PERMISIVE" - // mode, allows to read values as null for unsupported types. In case of "DROPMALFORMED" - // mode, drops records only containing non-null values in unsupported types. + // mode, it allows to read values as null for unsupported types. In case of "DROPMALFORMED" + // mode, it drops records only containing non-null values in unsupported types. JacksonUtils.verifySchema(schema) } val parsed = jsonRDD.mapPartitions { iter => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 1dbc66315d0a9..4d27be3edd06d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -145,8 +145,8 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { if (csvOptions.failFast) { // We can fail before starting to parse in cast of "FAILFAST" mode. In case of "PERMISIVE" - // mode, allows to read values as null for unsupported types. In case of "DROPMALFORMED" - // mode, drops records only containing non-null values in unsupported types. We should use + // mode, it allows to read values as null for unsupported types. In case of "DROPMALFORMED" + // mode, it drops records only containing non-null values in unsupported types. We should use // `requiredSchema` instead of whole schema `dataSchema` here to not to break the original // behaviour. verifySchema(requiredSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 95d53d08bca33..f5df706a99848 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -113,8 +113,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { if (parsedOptions.failFast) { // We can fail before starting to parse in cast of "FAILFAST" mode. In case of "PERMISIVE" - // mode, allows to read values as null for unsupported types. In case of "DROPMALFORMED" - // mode, drops records only containing non-null values in unsupported types. We should use + // mode, it allows to read values as null for unsupported types. In case of "DROPMALFORMED" + // mode, it drops records only containing non-null values in unsupported types. We should use // `requiredSchema` instead of whole schema `dataSchema` here to not to break the original // behaviour. JacksonUtils.verifySchema(requiredSchema) From 89a1fc82e83add873d6318ab14fb577b27d2afd4 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 3 Nov 2016 19:23:02 +0900 Subject: [PATCH 04/11] Neat text tests --- .../datasources/text/TextFileFormat.scala | 1 - .../datasources/text/TextSuite.scala | 43 ++++++++++--------- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 2683adbbb79d5..0de5801c94961 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -48,7 +48,6 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { throw new UnsupportedOperationException( s"Text data source supports only a single column, and you have ${schema.size} columns.") } - val tpe = schema(0).dataType if (tpe != StringType) { throw new UnsupportedOperationException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 2d24532f9ac67..84a38cd2ba5d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -22,7 +22,7 @@ import java.io.File import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode} +import org.apache.spark.sql.{DataFrame, QueryTest, Row, SaveMode} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -59,7 +59,28 @@ class TextSuite extends QueryTest with SharedSQLContext { "Text data source supports only a string column, but you have bigint.")) message = intercept[UnsupportedOperationException] { - spark.range(2).selectExpr("'a'", "'b'").write.text(path.getCanonicalPath) + Seq(("a", "b")).toDF().write.text(path.getCanonicalPath) + }.getMessage + assert(message.contains( + "Text data source supports only a single column, and you have 2 columns.")) + } + + withTempDir { dir => + var message = intercept[UnsupportedOperationException] { + val path = s"${dir.getAbsolutePath}/text1" + val schema = StructType(StructField("a", LongType, true) :: Nil) + spark.range(1).write.text(path) + spark.read.schema(schema).text(path).collect() + }.getMessage + assert(message.contains( + "Text data source supports only a string column, but you have bigint.")) + + message = intercept[UnsupportedOperationException] { + val path = s"${dir.getAbsolutePath}/text2" + val schema = StructType( + StructField("a", StringType, true) :: StructField("b", StringType, true) :: Nil) + Seq(Tuple1("a")).toDF().write.text(path) + spark.read.schema(schema).text(path).collect() }.getMessage assert(message.contains( "Text data source supports only a single column, and you have 2 columns.")) @@ -156,24 +177,6 @@ class TextSuite extends QueryTest with SharedSQLContext { } } - test("error handling for unsupported data types.") { - withTempPath { path => - var msg = intercept[UnsupportedOperationException] { - Seq((1, "Tesla")) - .toDF("a", "b").selectExpr("struct(a, b) as a").write.text(path.getAbsolutePath) - }.getMessage - assert(msg.contains( - "Text data source supports only a string column, but you have struct.")) - - msg = intercept[UnsupportedOperationException] { - val schema = StructType(StructField("a", LongType, true) :: Nil) - spark.range(1).write.text(path.getAbsolutePath) - spark.read.schema(schema).text(path.getAbsolutePath).collect() - }.getMessage - assert(msg.contains("Text data source supports only a string column, but you have bigint.")) - } - } - private def testFile: String = { Thread.currentThread().getContextClassLoader.getResource("test-data/text-suite.txt").toString } From ee913482feaddcc52f6f69384ed210be9ed5b7ef Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 3 Nov 2016 19:28:05 +0900 Subject: [PATCH 05/11] Fix typos --- .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 2 +- .../spark/sql/execution/datasources/csv/CSVFileFormat.scala | 2 +- .../spark/sql/execution/datasources/json/JsonFileFormat.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 742822e7557ae..a2e553274901f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -330,7 +330,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { parsedOptions) } if (parsedOptions.failFast) { - // We can fail before starting to parse in cast of "FAILFAST" mode. In case of "PERMISIVE" + // We can fail before starting to parse in case of "FAILFAST" mode. In case of "PERMISIVE" // mode, it allows to read values as null for unsupported types. In case of "DROPMALFORMED" // mode, it drops records only containing non-null values in unsupported types. JacksonUtils.verifySchema(schema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 4d27be3edd06d..31b724cc95434 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -144,7 +144,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) if (csvOptions.failFast) { - // We can fail before starting to parse in cast of "FAILFAST" mode. In case of "PERMISIVE" + // We can fail before starting to parse in case of "FAILFAST" mode. In case of "PERMISIVE" // mode, it allows to read values as null for unsupported types. In case of "DROPMALFORMED" // mode, it drops records only containing non-null values in unsupported types. We should use // `requiredSchema` instead of whole schema `dataSchema` here to not to break the original diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index f5df706a99848..851e55e78b2e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -112,7 +112,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) if (parsedOptions.failFast) { - // We can fail before starting to parse in cast of "FAILFAST" mode. In case of "PERMISIVE" + // We can fail before starting to parse in case of "FAILFAST" mode. In case of "PERMISIVE" // mode, it allows to read values as null for unsupported types. In case of "DROPMALFORMED" // mode, it drops records only containing non-null values in unsupported types. We should use // `requiredSchema` instead of whole schema `dataSchema` here to not to break the original From 14c3b0c1f37d950be24f832718eb8396f0ca42cf Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 4 Nov 2016 12:52:12 +0900 Subject: [PATCH 06/11] Add more test cases for other parse modes --- .../execution/datasources/csv/CSVSuite.scala | 56 +++++++++++++++---- .../datasources/json/JsonSuite.scala | 50 +++++++++++++++++ 2 files changed, 96 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index ea60fac4a3548..1dda2be663726 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -688,22 +688,58 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .write.csv(csvDir) }.getMessage assert(msg.contains("Unable to convert column vectors of type array to CSV")) + } + } - msg = intercept[UnsupportedOperationException] { + test("Unsupported types - DROPMALFORMED mode") { + withTempPath { path => + val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) + spark.range(1).write.csv(path.getAbsolutePath) + val df = spark.read + .schema(schema) + .option("mode", "DROPMALFORMED") + .csv(path.getAbsolutePath) + + assert(df.collect().isEmpty) + } + } + + test("Unsupported types - FAILFAST mode") { + withTempPath { path => + val msg = intercept[UnsupportedOperationException] { val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), true) :: Nil) - val path = s"$csvDir/tmp" - spark.range(1).write.csv(path) - spark.read.schema(schema).option("mode", "FAILFAST").csv(path).collect() + spark.range(1).write.csv(path.getAbsolutePath) + spark.read.schema(schema).option("mode", "FAILFAST").csv(path.getAbsolutePath).collect() }.getMessage + assert(msg.contains("Unable to convert column a of type array to CSV")) + } + } - msg = intercept[SparkException] { - val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), true) :: Nil) - val path = s"$csvDir/tmp1" + test("Unsupported types - PERMISSIVE mode") { + withTempDir { dir => + // If the values are null, it is fine to read. + val csvDir = new File(dir, "csv").getCanonicalPath + val schema = StructType(StructField("a", + StructType(StructField("b", StringType, true) :: Nil), true) :: Nil) + val path = s"$csvDir/tmp1" + Seq(Tuple1("null")).toDF().write.csv(path) + val df = spark.read + .schema(schema) + .option("nullValue", "null") + .option("mode", "PERMISSIVE") + .csv(path) + + checkAnswer(df, Row(null)) + + // If the values are non-null and the type is unsupported, it throws an exception. + val msg = intercept[SparkException] { + val path = s"$csvDir/tmp2" spark.range(1).write.csv(path) - spark.read.schema(schema).csv(path).show() - }.getCause.getMessage - assert(msg.contains("Unsupported type: array")) + spark.read.schema(schema).option("mode", "PERMISSIVE").csv(path).collect() + }.getCause.getMessage + + assert(msg.contains("Unsupported type: struct")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index a44c7148c9f50..540d99da1a98c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1113,6 +1113,31 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(jsonDFTwo.schema === schemaTwo) } + test("Unsupported types: DROPMALFORMED mode") { + val schema = StructType( + StructField("a", CalendarIntervalType, true) :: Nil) + val rdd = sparkContext.parallelize(Seq("""{"a": 1}""")) + // Read JSON data from RDD + val df = spark.read + .option("mode", "DROPMALFORMED") + .schema(schema) + .json(rdd) + + assert(df.collect().isEmpty) + + withTempPath { path => + // Read JSON data from files. + rdd.saveAsTextFile(path.getAbsolutePath) + val df = spark.read + .option("mode", "DROPMALFORMED") + .schema(schema) + .format("json") + .load(path.getAbsolutePath) + + assert(df.collect().isEmpty) + } + } + test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") { withTempView("jsonTable") { val schema = StructType( @@ -1194,6 +1219,31 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } + test("Unsupported types: PERMISSIVE mode") { + val schema = StructType( + StructField("a", CalendarIntervalType, true) :: Nil) + val rdd = sparkContext.parallelize(Seq("""{"a": 1}""")) + // Read JSON data from RDD + val df = spark.read + .option("mode", "PERMISSIVE") + .schema(schema) + .json(rdd) + + checkAnswer(df, Row(null)) + + withTempPath { path => + // Read JSON data from files. + rdd.saveAsTextFile(path.getAbsolutePath) + val df = spark.read + .option("mode", "PERMISSIVE") + .schema(schema) + .format("json") + .load(path.getAbsolutePath) + + checkAnswer(df, Row(null)) + } + } + test("SPARK-13953 Rename the corrupt record field via option") { val jsonDF = spark.read .option("columnNameOfCorruptRecord", "_malformed") From d30e21849dba56fe4c2b3ebeb026b8911a287203 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 4 Nov 2016 13:11:43 +0900 Subject: [PATCH 07/11] Another test case for reading null in JSON for unsupported types --- .../sql/execution/datasources/json/JsonSuite.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 540d99da1a98c..0b38918c1c019 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1125,6 +1125,16 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.collect().isEmpty) + val nullRdd = sparkContext.parallelize(Seq("""{"a": null}""")) + // Read JSON data from RDD + val nullDf = spark.read + .option("mode", "DROPMALFORMED") + .schema(schema) + .json(nullRdd) + + // This succeeds to read null even thought it is unsupported. + assert(nullDf.collect().head.get(0) == null) + withTempPath { path => // Read JSON data from files. rdd.saveAsTextFile(path.getAbsolutePath) From 4c52716f880163615211adfaed05c43c21da8b7c Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 4 Nov 2016 13:24:08 +0900 Subject: [PATCH 08/11] Clean up tests more --- .../execution/datasources/csv/CSVSuite.scala | 20 +++++++++++++++---- .../datasources/json/JsonSuite.scala | 2 +- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 1dda2be663726..163414ad8d290 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -692,8 +692,9 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } test("Unsupported types - DROPMALFORMED mode") { + val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) + withTempPath { path => - val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) spark.range(1).write.csv(path.getAbsolutePath) val df = spark.read .schema(schema) @@ -702,6 +703,18 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { assert(df.collect().isEmpty) } + + withTempPath { path => + Seq(Tuple1("null")).toDF().write.csv(path.getAbsolutePath) + val nullDf = spark.read + .schema(schema) + .option("nullValue", "null") + .option("mode", "DROPMALFORMED") + .csv(path.getAbsolutePath) + + // This succeeds to read null even thought it is unsupported. + checkAnswer(nullDf, Row(null)) + } } test("Unsupported types - FAILFAST mode") { @@ -719,10 +732,9 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { test("Unsupported types - PERMISSIVE mode") { withTempDir { dir => // If the values are null, it is fine to read. - val csvDir = new File(dir, "csv").getCanonicalPath val schema = StructType(StructField("a", StructType(StructField("b", StringType, true) :: Nil), true) :: Nil) - val path = s"$csvDir/tmp1" + val path = s"${dir.getAbsolutePath}/tmp1" Seq(Tuple1("null")).toDF().write.csv(path) val df = spark.read .schema(schema) @@ -734,7 +746,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { // If the values are non-null and the type is unsupported, it throws an exception. val msg = intercept[SparkException] { - val path = s"$csvDir/tmp2" + val path = s"${dir.getAbsolutePath}/tmp2" spark.range(1).write.csv(path) spark.read.schema(schema).option("mode", "PERMISSIVE").csv(path).collect() }.getCause.getMessage diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 0b38918c1c019..7e9309668d58f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1133,7 +1133,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .json(nullRdd) // This succeeds to read null even thought it is unsupported. - assert(nullDf.collect().head.get(0) == null) + checkAnswer(nullDf, Row(null)) withTempPath { path => // Read JSON data from files. From 8081f5822dc45db97cb7e40e7dfbc4b89208d3f9 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 4 Nov 2016 15:07:42 +0900 Subject: [PATCH 09/11] Minimise the changes and respect the original behaviour --- .../sql/execution/datasources/text/TextFileFormat.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 0de5801c94961..7704e1fa6b874 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -95,7 +95,10 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { - verifySchema(dataSchema) + if (requiredSchema.nonEmpty) { + // `requiredSchema` can be empty when the projected column is only the partitioned column. + verifySchema(requiredSchema) + } val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) From f68245ca9a76f820943268baadab9c6cc878080c Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 4 Nov 2016 17:39:58 +0900 Subject: [PATCH 10/11] Improve error message --- .../spark/sql/catalyst/json/JacksonUtils.scala | 2 +- .../datasources/csv/CSVFileFormat.scala | 18 +++++++++--------- .../apache/spark/sql/JsonFunctionsSuite.scala | 4 ++-- .../execution/datasources/csv/CSVSuite.scala | 10 +++++----- .../execution/datasources/json/JsonSuite.scala | 4 ++-- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index 3b23c6cd2816f..1fe9351e507dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -50,7 +50,7 @@ object JacksonUtils { case _ => throw new UnsupportedOperationException( - s"Unable to convert column $name of type ${dataType.simpleString} to JSON.") + s"Unsupported type ${dataType.simpleString} of column $name in JSON conversion.") } schema.foreach(field => verifyType(field.name, field.dataType)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 31b724cc95434..c4f2c352f769f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -231,18 +231,18 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { } private def verifySchema(schema: StructType): Unit = { - def verifyType(name: String, dataType: DataType): Unit = dataType match { - case ByteType | ShortType | IntegerType | LongType | FloatType | - DoubleType | BooleanType | _: DecimalType | TimestampType | - DateType | StringType => + def verifyType(dataType: DataType): Unit = dataType match { + case ByteType | ShortType | IntegerType | LongType | FloatType | + DoubleType | BooleanType | _: DecimalType | TimestampType | + DateType | StringType => - case udt: UserDefinedType[_] => verifyType(name, udt.sqlType) + case udt: UserDefinedType[_] => verifyType(udt.sqlType) - case _ => - throw new UnsupportedOperationException( - s"Unable to convert column $name of type ${dataType.simpleString} to CSV.") + case _ => + throw new UnsupportedOperationException( + s"CSV data source does not support ${dataType.simpleString} data type.") } - schema.foreach(field => verifyType(field.name, field.dataType)) + schema.foreach(field => verifyType(field.dataType)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index e7731a39a0a61..095ae138cb506 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -132,7 +132,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { df.select(from_json($"value", schema)).collect() } assert(e.getMessage.contains( - "Unable to convert column a of type calendarinterval to JSON.")) + "Unsupported type calendarinterval of column a in JSON conversion")) } test("to_json") { @@ -151,7 +151,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { df.select(to_json($"c")).collect() } assert(e.getMessage.contains( - "Unable to convert column a of type calendarinterval to JSON.")) + "Unsupported type calendarinterval of column a in JSON conversion")) } test("roundtrip in to_json and from_json") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 163414ad8d290..77a6f466578fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -671,23 +671,23 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { var msg = intercept[UnsupportedOperationException] { Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, b) as a").write.csv(csvDir) }.getMessage - assert(msg.contains("Unable to convert column a of type struct to CSV")) + assert(msg.contains("CSV data source does not support struct data type")) msg = intercept[UnsupportedOperationException] { Seq((1, Map("Tesla" -> 3))).toDF("id", "cars").write.csv(csvDir) }.getMessage - assert(msg.contains("Unable to convert column cars of type map to CSV")) + assert(msg.contains("CSV data source does not support map data type")) msg = intercept[UnsupportedOperationException] { Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands").write.csv(csvDir) }.getMessage - assert(msg.contains("Unable to convert column brands of type array to CSV")) + assert(msg.contains("CSV data source does not support array data type")) msg = intercept[UnsupportedOperationException] { Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25)))).toDF("id", "vectors") .write.csv(csvDir) }.getMessage - assert(msg.contains("Unable to convert column vectors of type array to CSV")) + assert(msg.contains("CSV data source does not support array data type")) } } @@ -725,7 +725,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { spark.read.schema(schema).option("mode", "FAILFAST").csv(path.getAbsolutePath).collect() }.getMessage - assert(msg.contains("Unable to convert column a of type array to CSV")) + assert(msg.contains("CSV data source does not support array data type")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 7e9309668d58f..9792056dbdf73 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1068,7 +1068,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } assert(exceptionOne.getMessage.contains( - "Unable to convert column a of type calendarinterval to JSON.")) + "Unsupported type calendarinterval of column a in JSON conversion.")) val exceptionTwo = intercept[UnsupportedOperationException] { // Read JSON data from files. @@ -1083,7 +1083,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } assert(exceptionTwo.getMessage.contains( - "Unable to convert column a of type calendarinterval to JSON.")) + "Unsupported type calendarinterval of column a in JSON conversion.")) } test("Corrupt records: DROPMALFORMED mode") { From bae8db89fef3f25478894eee962b027d8b426b01 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 4 Nov 2016 17:43:04 +0900 Subject: [PATCH 11/11] no extra change needed --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 77a6f466578fb..47a1999496d12 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -669,7 +669,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { withTempDir { dir => val csvDir = new File(dir, "csv").getCanonicalPath var msg = intercept[UnsupportedOperationException] { - Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, b) as a").write.csv(csvDir) + Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, b)").write.csv(csvDir) }.getMessage assert(msg.contains("CSV data source does not support struct data type"))