From 8169741c7ba1de048b3895c4fcd55c03dc01cacb Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Wed, 1 Mar 2017 18:58:56 +0900 Subject: [PATCH 1/5] Treat shorter/longer lengths of tokens as malformed records in CSV parser --- .../datasources/csv/UnivocityParser.scala | 67 +++++++++---------- .../resources/test-data/cars-alternative.csv | 2 +- .../resources/test-data/cars-malformed.csv | 2 +- .../test/resources/test-data/cars-null.csv | 2 +- .../src/test/resources/test-data/cars.csv | 2 +- .../src/test/resources/test-data/cars.tsv | 6 +- .../resources/test-data/cars_iso-8859-1.csv | 2 +- .../execution/datasources/csv/CSVSuite.scala | 55 +++++++-------- 8 files changed, 64 insertions(+), 74 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index e42ea3fa391f5..52c65eb807bc4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -254,6 +254,17 @@ class UnivocityParser( } } + /** + * This function deals with the cases it fails to parse in PERMISSIVE mode. The failure reasons + * of this mode are 1) the shorter/longer lengths of tokens or 2) format exceptions + * (e.g., NumberFormatException). + */ + private def failedRecordWithPermissiveMode(): Option[InternalRow] = { + val row = new GenericInternalRow(requiredSchema.length) + corruptFieldIndex.foreach(row(_) = UTF8String.fromString(getCurrentInput())) + Some(row) + } + private def convertWithParseMode( tokens: Array[String])(convert: Array[String] => InternalRow): Option[InternalRow] = { if (options.dropMalformed && dataSchema.length != tokens.length) { @@ -271,43 +282,27 @@ class UnivocityParser( throw new RuntimeException(s"Malformed line in FAILFAST mode: " + s"${tokens.mkString(options.delimiter.toString)}") } else { - // If a length of parsed tokens is not equal to expected one, it makes the length the same - // with the expected. If the length is shorter, it adds extra tokens in the tail. - // If longer, it drops extra tokens. - // - // TODO: Revisit this; if a length of tokens does not match an expected length in the schema, - // we probably need to treat it as a malformed record. - // See an URL below for related discussions: - // https://github.com/apache/spark/pull/16928#discussion_r102657214 - val checkedTokens = if (options.permissive && dataSchema.length != tokens.length) { - if (dataSchema.length > tokens.length) { - tokens ++ new Array[String](dataSchema.length - tokens.length) - } else { - tokens.take(dataSchema.length) - } + if (options.permissive && dataSchema.length != tokens.length) { + failedRecordWithPermissiveMode() } else { - tokens - } - - try { - Some(convert(checkedTokens)) - } catch { - case NonFatal(e) if options.permissive => - val row = new GenericInternalRow(requiredSchema.length) - corruptFieldIndex.foreach(row(_) = UTF8String.fromString(getCurrentInput())) - Some(row) - case NonFatal(e) if options.dropMalformed => - if (numMalformedRecords < options.maxMalformedLogPerPartition) { - logWarning("Parse exception. " + - s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}") - } - if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) { - logWarning( - s"More than ${options.maxMalformedLogPerPartition} malformed records have been " + - "found on this partition. Malformed records from now on will not be logged.") - } - numMalformedRecords += 1 - None + try { + Some(convert(tokens)) + } catch { + case NonFatal(e) if options.permissive => + failedRecordWithPermissiveMode() + case NonFatal(e) if options.dropMalformed => + if (numMalformedRecords < options.maxMalformedLogPerPartition) { + logWarning("Parse exception. " + + s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}") + } + if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) { + logWarning( + s"More than ${options.maxMalformedLogPerPartition} malformed records have been " + + "found on this partition. Malformed records from now on will not be logged.") + } + numMalformedRecords += 1 + None + } } } } diff --git a/sql/core/src/test/resources/test-data/cars-alternative.csv b/sql/core/src/test/resources/test-data/cars-alternative.csv index 646f7c456c866..1273bcf004f5c 100644 --- a/sql/core/src/test/resources/test-data/cars-alternative.csv +++ b/sql/core/src/test/resources/test-data/cars-alternative.csv @@ -2,4 +2,4 @@ year|make|model|comment|blank '2012'|'Tesla'|'S'| 'No comment'| 1997|Ford|E350|'Go get one now they are going fast'| -2015|Chevy|Volt +2015|Chevy|Volt|| diff --git a/sql/core/src/test/resources/test-data/cars-malformed.csv b/sql/core/src/test/resources/test-data/cars-malformed.csv index cfa378c01f1d9..c77755664b13a 100644 --- a/sql/core/src/test/resources/test-data/cars-malformed.csv +++ b/sql/core/src/test/resources/test-data/cars-malformed.csv @@ -3,4 +3,4 @@ year,make,model,comment,blank "2012","Tesla","S","No comment",,null,null 1997,Ford,E350,"Go get one now they are going fast",,null,null -2015,Chevy,,,, +2015,Chevy,,, diff --git a/sql/core/src/test/resources/test-data/cars-null.csv b/sql/core/src/test/resources/test-data/cars-null.csv index 130c0b40bbe78..8d349c6d2e620 100644 --- a/sql/core/src/test/resources/test-data/cars-null.csv +++ b/sql/core/src/test/resources/test-data/cars-null.csv @@ -2,5 +2,5 @@ year,make,model,comment,blank "2012","Tesla","S",null, 1997,Ford,E350,"Go get one now they are going fast", -null,Chevy,Volt +null,Chevy,Volt,, diff --git a/sql/core/src/test/resources/test-data/cars.csv b/sql/core/src/test/resources/test-data/cars.csv index 40ded573ade5c..a220563e9105b 100644 --- a/sql/core/src/test/resources/test-data/cars.csv +++ b/sql/core/src/test/resources/test-data/cars.csv @@ -3,5 +3,5 @@ year,make,model,comment,blank "2012","Tesla","S","No comment", 1997,Ford,E350,"Go get one now they are going fast", -2015,Chevy,Volt +2015,Chevy,Volt,, diff --git a/sql/core/src/test/resources/test-data/cars.tsv b/sql/core/src/test/resources/test-data/cars.tsv index a7bfa9a91f961..3a8ca4249f78e 100644 --- a/sql/core/src/test/resources/test-data/cars.tsv +++ b/sql/core/src/test/resources/test-data/cars.tsv @@ -1,4 +1,4 @@ year make model price comment blank -2012 Tesla S "80,000.65" -1997 Ford E350 35,000 "Go get one now they are going fast" -2015 Chevy Volt 5,000.10 +2012 Tesla S "80,000.65" +1997 Ford E350 35,000 "Go get one now they are going fast" +2015 Chevy Volt 5,000.10 diff --git a/sql/core/src/test/resources/test-data/cars_iso-8859-1.csv b/sql/core/src/test/resources/test-data/cars_iso-8859-1.csv index c51b6c59010f0..628acb08b6953 100644 --- a/sql/core/src/test/resources/test-data/cars_iso-8859-1.csv +++ b/sql/core/src/test/resources/test-data/cars_iso-8859-1.csv @@ -2,5 +2,5 @@ year "2012"þ"Tesla"þ"S"þ"No comment"þ 1997þFordþE350þ"Go get one now they are þoing fast"þ -2015þChevyþVolt +2015þChevyþVoltþþ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 95dfdf5b298e6..65d09b8571491 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -266,9 +266,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .format("csv") .option("wholeFile", wholeFile) .options(Map("header" -> "true", "mode" -> "dropmalformed")) - .load(testFile(carsFile)) + .option("comment", "~") + .load(testFile(carsMalformedFile)) - assert(cars.select("year").collect().size === 2) + assert(cars.select("year").collect().size === 1) } } @@ -290,24 +291,15 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .format("csv") .option("wholeFile", wholeFile) .options(Map("header" -> "true", "mode" -> "failfast")) - .load(testFile(carsFile)).collect() + .option("comment", "~") + .load(testFile(carsMalformedFile)).collect() } - assert(exception.getMessage.contains("Malformed line in FAILFAST mode: 2015,Chevy,Volt")) + assert(exception.getMessage.contains( + "Malformed line in FAILFAST mode: 2012,Tesla,S,No comment,,null,null")) } } - test("test for tokens more than the fields in the schema") { - val cars = spark - .read - .format("csv") - .option("header", "false") - .option("comment", "~") - .load(testFile(carsMalformedFile)) - - verifyCars(cars, withHeader = false, checkTypes = false) - } - test("test with null quote character") { val cars = spark.read .format("csv") @@ -974,21 +966,6 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } - test("load null when the schema is larger than parsed tokens ") { - withTempPath { path => - Seq("1").toDF().write.text(path.getAbsolutePath) - val schema = StructType( - StructField("a", IntegerType, true) :: - StructField("b", IntegerType, true) :: Nil) - val df = spark.read - .schema(schema) - .option("header", "false") - .csv(path.getAbsolutePath) - - checkAnswer(df, Row(1, null)) - } - } - test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") { Seq(false, true).foreach { wholeFile => val schema = new StructType().add("a", IntegerType).add("b", TimestampType) @@ -1116,4 +1093,22 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { assert(df2.schema === schema) } + test("SPARK-XXXXX regard as malformed records if the length is not equal to expected one") { + val columnNameOfCorruptRecord = "_unparsed" + withTempPath { path => + Seq("1,2", "1,2,3,4").toDF().write.text(path.getAbsolutePath) + val schema = StructType( + StructField("a", IntegerType, true) :: + StructField("b", IntegerType, true) :: + StructField("c", IntegerType, true) :: + StructField(columnNameOfCorruptRecord, StringType, true) :: Nil) + val df = spark.read + .schema(schema) + .option("header", "false") + .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) + .csv(path.getAbsolutePath) + + checkAnswer(df, Row(null, null, null, "1,2") :: Row(null, null, null, "1,2,3,4") :: Nil) + } + } } From 4b9b2bd8585f8490cc05fa904ef4faba4c0439e8 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 3 Mar 2017 17:40:45 +0900 Subject: [PATCH 2/5] Fix errors in R tests --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index cbc3569795d97..6b7fb541e2815 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -254,8 +254,8 @@ test_that("read/write csv as DataFrame", { mockLinesCsv <- c("year,make,model,comment,blank", "\"2012\",\"Tesla\",\"S\",\"No comment\",", "1997,Ford,E350,\"Go get one now they are going fast\",", - "2015,Chevy,Volt", - "NA,Dummy,Placeholder") + "2015,Chevy,Volt,,", + "NA,Dummy,Placeholder,,") writeLines(mockLinesCsv, csvPath) # default "header" is false, inferSchema to handle "year" as "int" @@ -274,8 +274,8 @@ test_that("read/write csv as DataFrame", { mockLinesCsv <- c("year,make,model,comment,blank", "\"2012\",\"Tesla\",\"S\",\"No comment\",", "1997,Ford,E350,\"Go get one now they are going fast\",", - "2015,Chevy,Volt", - "Empty,Dummy,Placeholder") + "2015,Chevy,Volt,,", + "Empty,Dummy,Placeholder,,") writeLines(mockLinesCsv, csvPath) df2 <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "Empty") @@ -302,8 +302,8 @@ test_that("Support other types for options", { mockLinesCsv <- c("year,make,model,comment,blank", "\"2012\",\"Tesla\",\"S\",\"No comment\",", "1997,Ford,E350,\"Go get one now they are going fast\",", - "2015,Chevy,Volt", - "NA,Dummy,Placeholder") + "2015,Chevy,Volt,,", + "NA,Dummy,Placeholder,,") writeLines(mockLinesCsv, csvPath) csvDf <- read.df(csvPath, "csv", header = "true", inferSchema = "true") From 3829042d5eb6b885e736aca2a77118d50c18cb0b Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 5 Mar 2017 19:45:46 +0900 Subject: [PATCH 3/5] Revert "Fix errors in R tests" This reverts commit d88a96684f50ad8674d0f1c6ad4a5f68faf271b4. --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 6b7fb541e2815..cbc3569795d97 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -254,8 +254,8 @@ test_that("read/write csv as DataFrame", { mockLinesCsv <- c("year,make,model,comment,blank", "\"2012\",\"Tesla\",\"S\",\"No comment\",", "1997,Ford,E350,\"Go get one now they are going fast\",", - "2015,Chevy,Volt,,", - "NA,Dummy,Placeholder,,") + "2015,Chevy,Volt", + "NA,Dummy,Placeholder") writeLines(mockLinesCsv, csvPath) # default "header" is false, inferSchema to handle "year" as "int" @@ -274,8 +274,8 @@ test_that("read/write csv as DataFrame", { mockLinesCsv <- c("year,make,model,comment,blank", "\"2012\",\"Tesla\",\"S\",\"No comment\",", "1997,Ford,E350,\"Go get one now they are going fast\",", - "2015,Chevy,Volt,,", - "Empty,Dummy,Placeholder,,") + "2015,Chevy,Volt", + "Empty,Dummy,Placeholder") writeLines(mockLinesCsv, csvPath) df2 <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "Empty") @@ -302,8 +302,8 @@ test_that("Support other types for options", { mockLinesCsv <- c("year,make,model,comment,blank", "\"2012\",\"Tesla\",\"S\",\"No comment\",", "1997,Ford,E350,\"Go get one now they are going fast\",", - "2015,Chevy,Volt,,", - "NA,Dummy,Placeholder,,") + "2015,Chevy,Volt", + "NA,Dummy,Placeholder") writeLines(mockLinesCsv, csvPath) csvDf <- read.df(csvPath, "csv", header = "true", inferSchema = "true") From 031917091390caa4f70f9723b70d484d2ea37d35 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 5 Mar 2017 19:46:12 +0900 Subject: [PATCH 4/5] Revert "Treat shorter/longer lengths of tokens as malformed records in CSV parser" This reverts commit aa290ee32ef09d6d018f261c3bccb85d08259ac5. --- .../datasources/csv/UnivocityParser.scala | 67 ++++++++++--------- .../resources/test-data/cars-alternative.csv | 2 +- .../resources/test-data/cars-malformed.csv | 2 +- .../test/resources/test-data/cars-null.csv | 2 +- .../src/test/resources/test-data/cars.csv | 2 +- .../src/test/resources/test-data/cars.tsv | 6 +- .../resources/test-data/cars_iso-8859-1.csv | 2 +- .../execution/datasources/csv/CSVSuite.scala | 37 ++++++++-- 8 files changed, 74 insertions(+), 46 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 52c65eb807bc4..e42ea3fa391f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -254,17 +254,6 @@ class UnivocityParser( } } - /** - * This function deals with the cases it fails to parse in PERMISSIVE mode. The failure reasons - * of this mode are 1) the shorter/longer lengths of tokens or 2) format exceptions - * (e.g., NumberFormatException). - */ - private def failedRecordWithPermissiveMode(): Option[InternalRow] = { - val row = new GenericInternalRow(requiredSchema.length) - corruptFieldIndex.foreach(row(_) = UTF8String.fromString(getCurrentInput())) - Some(row) - } - private def convertWithParseMode( tokens: Array[String])(convert: Array[String] => InternalRow): Option[InternalRow] = { if (options.dropMalformed && dataSchema.length != tokens.length) { @@ -282,27 +271,43 @@ class UnivocityParser( throw new RuntimeException(s"Malformed line in FAILFAST mode: " + s"${tokens.mkString(options.delimiter.toString)}") } else { - if (options.permissive && dataSchema.length != tokens.length) { - failedRecordWithPermissiveMode() - } else { - try { - Some(convert(tokens)) - } catch { - case NonFatal(e) if options.permissive => - failedRecordWithPermissiveMode() - case NonFatal(e) if options.dropMalformed => - if (numMalformedRecords < options.maxMalformedLogPerPartition) { - logWarning("Parse exception. " + - s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}") - } - if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) { - logWarning( - s"More than ${options.maxMalformedLogPerPartition} malformed records have been " + - "found on this partition. Malformed records from now on will not be logged.") - } - numMalformedRecords += 1 - None + // If a length of parsed tokens is not equal to expected one, it makes the length the same + // with the expected. If the length is shorter, it adds extra tokens in the tail. + // If longer, it drops extra tokens. + // + // TODO: Revisit this; if a length of tokens does not match an expected length in the schema, + // we probably need to treat it as a malformed record. + // See an URL below for related discussions: + // https://github.com/apache/spark/pull/16928#discussion_r102657214 + val checkedTokens = if (options.permissive && dataSchema.length != tokens.length) { + if (dataSchema.length > tokens.length) { + tokens ++ new Array[String](dataSchema.length - tokens.length) + } else { + tokens.take(dataSchema.length) } + } else { + tokens + } + + try { + Some(convert(checkedTokens)) + } catch { + case NonFatal(e) if options.permissive => + val row = new GenericInternalRow(requiredSchema.length) + corruptFieldIndex.foreach(row(_) = UTF8String.fromString(getCurrentInput())) + Some(row) + case NonFatal(e) if options.dropMalformed => + if (numMalformedRecords < options.maxMalformedLogPerPartition) { + logWarning("Parse exception. " + + s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}") + } + if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) { + logWarning( + s"More than ${options.maxMalformedLogPerPartition} malformed records have been " + + "found on this partition. Malformed records from now on will not be logged.") + } + numMalformedRecords += 1 + None } } } diff --git a/sql/core/src/test/resources/test-data/cars-alternative.csv b/sql/core/src/test/resources/test-data/cars-alternative.csv index 1273bcf004f5c..646f7c456c866 100644 --- a/sql/core/src/test/resources/test-data/cars-alternative.csv +++ b/sql/core/src/test/resources/test-data/cars-alternative.csv @@ -2,4 +2,4 @@ year|make|model|comment|blank '2012'|'Tesla'|'S'| 'No comment'| 1997|Ford|E350|'Go get one now they are going fast'| -2015|Chevy|Volt|| +2015|Chevy|Volt diff --git a/sql/core/src/test/resources/test-data/cars-malformed.csv b/sql/core/src/test/resources/test-data/cars-malformed.csv index c77755664b13a..cfa378c01f1d9 100644 --- a/sql/core/src/test/resources/test-data/cars-malformed.csv +++ b/sql/core/src/test/resources/test-data/cars-malformed.csv @@ -3,4 +3,4 @@ year,make,model,comment,blank "2012","Tesla","S","No comment",,null,null 1997,Ford,E350,"Go get one now they are going fast",,null,null -2015,Chevy,,, +2015,Chevy,,,, diff --git a/sql/core/src/test/resources/test-data/cars-null.csv b/sql/core/src/test/resources/test-data/cars-null.csv index 8d349c6d2e620..130c0b40bbe78 100644 --- a/sql/core/src/test/resources/test-data/cars-null.csv +++ b/sql/core/src/test/resources/test-data/cars-null.csv @@ -2,5 +2,5 @@ year,make,model,comment,blank "2012","Tesla","S",null, 1997,Ford,E350,"Go get one now they are going fast", -null,Chevy,Volt,, +null,Chevy,Volt diff --git a/sql/core/src/test/resources/test-data/cars.csv b/sql/core/src/test/resources/test-data/cars.csv index a220563e9105b..40ded573ade5c 100644 --- a/sql/core/src/test/resources/test-data/cars.csv +++ b/sql/core/src/test/resources/test-data/cars.csv @@ -3,5 +3,5 @@ year,make,model,comment,blank "2012","Tesla","S","No comment", 1997,Ford,E350,"Go get one now they are going fast", -2015,Chevy,Volt,, +2015,Chevy,Volt diff --git a/sql/core/src/test/resources/test-data/cars.tsv b/sql/core/src/test/resources/test-data/cars.tsv index 3a8ca4249f78e..a7bfa9a91f961 100644 --- a/sql/core/src/test/resources/test-data/cars.tsv +++ b/sql/core/src/test/resources/test-data/cars.tsv @@ -1,4 +1,4 @@ year make model price comment blank -2012 Tesla S "80,000.65" -1997 Ford E350 35,000 "Go get one now they are going fast" -2015 Chevy Volt 5,000.10 +2012 Tesla S "80,000.65" +1997 Ford E350 35,000 "Go get one now they are going fast" +2015 Chevy Volt 5,000.10 diff --git a/sql/core/src/test/resources/test-data/cars_iso-8859-1.csv b/sql/core/src/test/resources/test-data/cars_iso-8859-1.csv index 628acb08b6953..c51b6c59010f0 100644 --- a/sql/core/src/test/resources/test-data/cars_iso-8859-1.csv +++ b/sql/core/src/test/resources/test-data/cars_iso-8859-1.csv @@ -2,5 +2,5 @@ year "2012"þ"Tesla"þ"S"þ"No comment"þ 1997þFordþE350þ"Go get one now they are þoing fast"þ -2015þChevyþVoltþþ +2015þChevyþVolt diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 65d09b8571491..ba99a9c058629 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -266,10 +266,9 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .format("csv") .option("wholeFile", wholeFile) .options(Map("header" -> "true", "mode" -> "dropmalformed")) - .option("comment", "~") - .load(testFile(carsMalformedFile)) + .load(testFile(carsFile)) - assert(cars.select("year").collect().size === 1) + assert(cars.select("year").collect().size === 2) } } @@ -291,15 +290,24 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .format("csv") .option("wholeFile", wholeFile) .options(Map("header" -> "true", "mode" -> "failfast")) - .option("comment", "~") - .load(testFile(carsMalformedFile)).collect() + .load(testFile(carsFile)).collect() } - assert(exception.getMessage.contains( - "Malformed line in FAILFAST mode: 2012,Tesla,S,No comment,,null,null")) + assert(exception.getMessage.contains("Malformed line in FAILFAST mode: 2015,Chevy,Volt")) } } + test("test for tokens more than the fields in the schema") { + val cars = spark + .read + .format("csv") + .option("header", "false") + .option("comment", "~") + .load(testFile(carsMalformedFile)) + + verifyCars(cars, withHeader = false, checkTypes = false) + } + test("test with null quote character") { val cars = spark.read .format("csv") @@ -966,6 +974,21 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + test("load null when the schema is larger than parsed tokens ") { + withTempPath { path => + Seq("1").toDF().write.text(path.getAbsolutePath) + val schema = StructType( + StructField("a", IntegerType, true) :: + StructField("b", IntegerType, true) :: Nil) + val df = spark.read + .schema(schema) + .option("header", "false") + .csv(path.getAbsolutePath) + + checkAnswer(df, Row(1, null)) + } + } + test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") { Seq(false, true).foreach { wholeFile => val schema = new StructType().add("a", IntegerType).add("b", TimestampType) From 3ff3d3fdcb33f3668a21ff137e1ccbda308bbe52 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 5 Mar 2017 20:20:10 +0900 Subject: [PATCH 5/5] Treat longer lengths of tokens as malformed records in CSV parser --- .../datasources/csv/UnivocityParser.scala | 71 ++++++++++--------- .../execution/datasources/csv/CSVSuite.scala | 16 +---- 2 files changed, 40 insertions(+), 47 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index e42ea3fa391f5..b1d7702079ca0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -254,6 +254,17 @@ class UnivocityParser( } } + /** + * This function deals with the cases it fails to parse in PERMISSIVE mode. The failure reasons + * of this mode are 1) the longer lengths of tokens than expected or 2) format exceptions + * (e.g., NumberFormatException). + */ + private def failedRecordWithPermissiveMode(): Option[InternalRow] = { + val row = new GenericInternalRow(requiredSchema.length) + corruptFieldIndex.foreach(row(_) = UTF8String.fromString(getCurrentInput())) + Some(row) + } + private def convertWithParseMode( tokens: Array[String])(convert: Array[String] => InternalRow): Option[InternalRow] = { if (options.dropMalformed && dataSchema.length != tokens.length) { @@ -271,43 +282,37 @@ class UnivocityParser( throw new RuntimeException(s"Malformed line in FAILFAST mode: " + s"${tokens.mkString(options.delimiter.toString)}") } else { - // If a length of parsed tokens is not equal to expected one, it makes the length the same - // with the expected. If the length is shorter, it adds extra tokens in the tail. - // If longer, it drops extra tokens. - // - // TODO: Revisit this; if a length of tokens does not match an expected length in the schema, - // we probably need to treat it as a malformed record. - // See an URL below for related discussions: - // https://github.com/apache/spark/pull/16928#discussion_r102657214 - val checkedTokens = if (options.permissive && dataSchema.length != tokens.length) { - if (dataSchema.length > tokens.length) { + // If a length of parsed tokens is longer than expected, it treats them as malformed. + if (options.permissive && dataSchema.length < tokens.length) { + failedRecordWithPermissiveMode() + } else { + // If the length is shorter than expected, it adds extra tokens in the tail. + val checkedTokens = if (options.permissive && dataSchema.length > tokens.length) { tokens ++ new Array[String](dataSchema.length - tokens.length) } else { - tokens.take(dataSchema.length) + tokens } - } else { - tokens - } - try { - Some(convert(checkedTokens)) - } catch { - case NonFatal(e) if options.permissive => - val row = new GenericInternalRow(requiredSchema.length) - corruptFieldIndex.foreach(row(_) = UTF8String.fromString(getCurrentInput())) - Some(row) - case NonFatal(e) if options.dropMalformed => - if (numMalformedRecords < options.maxMalformedLogPerPartition) { - logWarning("Parse exception. " + - s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}") - } - if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) { - logWarning( - s"More than ${options.maxMalformedLogPerPartition} malformed records have been " + - "found on this partition. Malformed records from now on will not be logged.") - } - numMalformedRecords += 1 - None + try { + Some(convert(checkedTokens)) + } catch { + case NonFatal(e) if options.permissive => + val row = new GenericInternalRow(requiredSchema.length) + corruptFieldIndex.foreach(row(_) = UTF8String.fromString(getCurrentInput())) + Some(row) + case NonFatal(e) if options.dropMalformed => + if (numMalformedRecords < options.maxMalformedLogPerPartition) { + logWarning("Parse exception. " + + s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}") + } + if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) { + logWarning( + s"More than ${options.maxMalformedLogPerPartition} malformed records have been " + + "found on this partition. Malformed records from now on will not be logged.") + } + numMalformedRecords += 1 + None + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index ba99a9c058629..bbbec937504ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -297,17 +297,6 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } - test("test for tokens more than the fields in the schema") { - val cars = spark - .read - .format("csv") - .option("header", "false") - .option("comment", "~") - .load(testFile(carsMalformedFile)) - - verifyCars(cars, withHeader = false, checkTypes = false) - } - test("test with null quote character") { val cars = spark.read .format("csv") @@ -1116,14 +1105,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { assert(df2.schema === schema) } - test("SPARK-XXXXX regard as malformed records if the length is not equal to expected one") { + test("SPARK-19783 test for tokens more than the fields in the schema") { val columnNameOfCorruptRecord = "_unparsed" withTempPath { path => Seq("1,2", "1,2,3,4").toDF().write.text(path.getAbsolutePath) val schema = StructType( StructField("a", IntegerType, true) :: StructField("b", IntegerType, true) :: - StructField("c", IntegerType, true) :: StructField(columnNameOfCorruptRecord, StringType, true) :: Nil) val df = spark.read .schema(schema) @@ -1131,7 +1119,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) .csv(path.getAbsolutePath) - checkAnswer(df, Row(null, null, null, "1,2") :: Row(null, null, null, "1,2,3,4") :: Nil) + checkAnswer(df, Row(1, 2, null) :: Row(null, null, "1,2,3,4") :: Nil) } } }