From bc4ce261a2d13be0a31b18f006da79b55880d409 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 28 Jul 2018 17:31:20 +0200 Subject: [PATCH 01/22] Added a benchmark for count() --- .../datasources/json/JsonBenchmarks.scala | 45 ++++++++++++++++++- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala index 85cf054e51f6b..563e4814dc98a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.execution.datasources.json import java.io.File import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.types.{LongType, StringType, StructType} +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.types._ import org.apache.spark.util.{Benchmark, Utils} /** @@ -171,9 +172,49 @@ object JSONBenchmarks { } } + def countBenchmark(rowsNum: Int): Unit = { + val colsNum = 10 + val benchmark = new Benchmark(s"Count a dataset with $colsNum columns", rowsNum) + + withTempPath { path => + val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType)) + val schema = StructType(fields) + val columnNames = schema.fieldNames + + spark.range(rowsNum) + .select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*) + .write + .json(path.getAbsolutePath) + + val ds = spark.read.schema(schema).json(path.getAbsolutePath) + + benchmark.addCase(s"Select $colsNum columns + count()", 3) { _ => + ds.select("*").filter((_: Row) => true).count() + } + benchmark.addCase(s"Select 1 column + count()", 3) { _ => + ds.select($"col1").filter((_: Row) => true).count() + } + benchmark.addCase(s"count()", 3) { _ => + ds.count() + } + + /* + Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz + + Count a dataset with 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + --------------------------------------------------------------------------------------------- + Select 10 columns + count() 9809 / 9901 1.0 980.9 1.0X + Select 1 column + count() 8275 / 8291 1.2 827.5 1.2X + count() 7676 / 7715 1.3 767.6 1.3X 24344 / 24642 0.0 24343.8 3.3X + */ + benchmark.run() + } + } + def main(args: Array[String]): Unit = { schemaInferring(100 * 1000 * 1000) perlineParsing(100 * 1000 * 1000) perlineParsingOfWideColumn(10 * 1000 * 1000) + countBenchmark(10 * 1000 * 1000) } } From 91250d21d4bb451062873c59df6fe3b4669bc5ff Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 28 Jul 2018 17:50:15 +0200 Subject: [PATCH 02/22] Added a CSV benchmark for count() --- .../datasources/csv/CSVBenchmarks.scala | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala index 1a3dacb8398e6..b3dd86dd2fabe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala @@ -119,8 +119,47 @@ object CSVBenchmarks { } } + def countBenchmark(rowsNum: Int): Unit = { + val colsNum = 10 + val benchmark = new Benchmark(s"Count a dataset with $colsNum columns", rowsNum) + + withTempPath { path => + val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType)) + val schema = StructType(fields) + + spark.range(rowsNum) + .select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*) + .write + .csv(path.getAbsolutePath) + + val ds = spark.read.schema(schema).csv(path.getAbsolutePath) + + benchmark.addCase(s"Select $colsNum columns + count()", 3) { _ => + ds.select("*").filter((_: Row) => true).count() + } + benchmark.addCase(s"Select 1 column + count()", 3) { _ => + ds.select($"col1").filter((_: Row) => true).count() + } + benchmark.addCase(s"count()", 3) { _ => + ds.count() + } + + /* + Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz + + Count a dataset with 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + --------------------------------------------------------------------------------------------- + Select 10 columns + count() 12903 / 12934 0.8 1290.3 1.0X + Select 1 column + count() 8056 / 8089 1.2 805.6 1.6X + count() 3309 / 3363 3.0 330.9 3.9X 24344 / 24642 0.0 24343.8 3.3X + */ + benchmark.run() + } + } + def main(args: Array[String]): Unit = { quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3) multiColumnsBenchmark(rowsNum = 1000 * 1000) + countBenchmark(10 * 1000 * 1000) } } From bdc5ea540b9eb62bb28606bdeb311ce5662e4bf7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 28 Jul 2018 17:59:44 +0200 Subject: [PATCH 03/22] Speed up count() --- .../org/apache/spark/sql/DataFrameReader.scala | 6 ++++-- .../datasources/FailureSafeParser.scala | 10 ++++++++-- .../datasources/csv/UnivocityParser.scala | 16 +++++----------- .../datasources/json/JsonDataSource.scala | 6 ++++-- .../execution/datasources/json/JsonSuite.scala | 2 +- 5 files changed, 22 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index ec9352a7fa055..e91e3111e4dad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -450,7 +450,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { input => rawParser.parse(input, createParser, UTF8String.fromString), parsedOptions.parseMode, schema, - parsedOptions.columnNameOfCorruptRecord) + parsedOptions.columnNameOfCorruptRecord, + optimizeEmptySchema = true) iter.flatMap(parser.parse) } sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming) @@ -521,7 +522,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { input => Seq(rawParser.parse(input)), parsedOptions.parseMode, schema, - parsedOptions.columnNameOfCorruptRecord) + parsedOptions.columnNameOfCorruptRecord, + optimizeEmptySchema = true) iter.flatMap(parser.parse) } sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = csvDataset.isStreaming) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala index 43591a9ff524a..9d067332df148 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala @@ -28,7 +28,8 @@ class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], mode: ParseMode, schema: StructType, - columnNameOfCorruptRecord: String) { + columnNameOfCorruptRecord: String, + optimizeEmptySchema: Boolean) { private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) @@ -56,9 +57,14 @@ class FailureSafeParser[IN]( } } + private val skipParsing = optimizeEmptySchema && schema.isEmpty def parse(input: IN): Iterator[InternalRow] = { try { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + if (skipParsing) { + Iterator.single(InternalRow.empty) + } else { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + } } catch { case e: BadRecordException => mode match { case PermissiveMode => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 79143cce4a380..15c48c7b1b9f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -203,19 +203,11 @@ class UnivocityParser( } } - private val doParse = if (requiredSchema.nonEmpty) { - (input: String) => convert(tokenizer.parseLine(input)) - } else { - // If `columnPruning` enabled and partition attributes scanned only, - // `schema` gets empty. - (_: String) => InternalRow.empty - } - /** * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): InternalRow = doParse(input) + def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) private val getToken = if (options.columnPruning) { (tokens: Array[String], index: Int) => tokens(index) @@ -293,7 +285,8 @@ private[csv] object UnivocityParser { input => Seq(parser.convert(input)), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord) + parser.options.columnNameOfCorruptRecord, + optimizeEmptySchema = false) convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens => safeParser.parse(tokens) }.flatten @@ -341,7 +334,8 @@ private[csv] object UnivocityParser { input => Seq(parser.parse(input)), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord) + parser.options.columnNameOfCorruptRecord, + optimizeEmptySchema = true) filteredLines.flatMap(safeParser.parse) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 2fee2128ba1f9..b3b2846aaf631 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -139,7 +139,8 @@ object TextInputJsonDataSource extends JsonDataSource { input => parser.parse(input, textParser, textToUTF8String), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord) + parser.options.columnNameOfCorruptRecord, + optimizeEmptySchema = true) linesReader.flatMap(safeParser.parse) } @@ -223,7 +224,8 @@ object MultiLineJsonDataSource extends JsonDataSource { input => parser.parse[InputStream](input, streamParser, partitionedFileString), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord) + parser.options.columnNameOfCorruptRecord, + optimizeEmptySchema = false) safeParser.parse( CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath)))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 655f40ad549e6..796088d2e724a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2233,7 +2233,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .option("multiline", "true") .options(Map("encoding" -> "UTF-16BE")) .json(testFile(fileName)) - .count() + .collect() } val errMsg = exception.getMessage From d40f9bb229ab8ea9e2d95499ae203f7c41098bcd Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 28 Jul 2018 18:00:17 +0200 Subject: [PATCH 04/22] Updating CSV and JSON benchmarks for count() --- .../sql/execution/datasources/csv/CSVBenchmarks.scala | 10 +++++----- .../execution/datasources/json/JsonBenchmarks.scala | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala index b3dd86dd2fabe..a048a08a9d972 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala @@ -149,17 +149,17 @@ object CSVBenchmarks { Count a dataset with 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------- - Select 10 columns + count() 12903 / 12934 0.8 1290.3 1.0X - Select 1 column + count() 8056 / 8089 1.2 805.6 1.6X - count() 3309 / 3363 3.0 330.9 3.9X 24344 / 24642 0.0 24343.8 3.3X + Select 10 columns + count() 12598 / 12740 0.8 1259.8 1.0X + Select 1 column + count() 7960 / 8175 1.3 796.0 1.6X + count() 2332 / 2386 4.3 233.2 5.4X 24344 / 24642 0.0 24343.8 3.3X */ benchmark.run() } } def main(args: Array[String]): Unit = { - quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3) - multiColumnsBenchmark(rowsNum = 1000 * 1000) +// quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3) +// multiColumnsBenchmark(rowsNum = 1000 * 1000) countBenchmark(10 * 1000 * 1000) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala index 563e4814dc98a..4d9f3f27490e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala @@ -203,9 +203,9 @@ object JSONBenchmarks { Count a dataset with 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------- - Select 10 columns + count() 9809 / 9901 1.0 980.9 1.0X - Select 1 column + count() 8275 / 8291 1.2 827.5 1.2X - count() 7676 / 7715 1.3 767.6 1.3X 24344 / 24642 0.0 24343.8 3.3X + Select 10 columns + count() 9961 / 10006 1.0 996.1 1.0X + Select 1 column + count() 8355 / 8470 1.2 835.5 1.2X + count() 2104 / 2156 4.8 210.4 4.7X 24344 / 24642 0.0 24343.8 3.3X */ benchmark.run() } From abd8572497ff742ef6ea942864195be75a40ca71 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 28 Jul 2018 18:23:03 +0200 Subject: [PATCH 05/22] Fix benchmark's output --- .../spark/sql/execution/datasources/csv/CSVBenchmarks.scala | 2 +- .../spark/sql/execution/datasources/json/JsonBenchmarks.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala index a048a08a9d972..4d533f99bc27d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala @@ -151,7 +151,7 @@ object CSVBenchmarks { --------------------------------------------------------------------------------------------- Select 10 columns + count() 12598 / 12740 0.8 1259.8 1.0X Select 1 column + count() 7960 / 8175 1.3 796.0 1.6X - count() 2332 / 2386 4.3 233.2 5.4X 24344 / 24642 0.0 24343.8 3.3X + count() 2332 / 2386 4.3 233.2 5.4X */ benchmark.run() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala index 4d9f3f27490e6..a2b747eaab411 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala @@ -205,7 +205,7 @@ object JSONBenchmarks { --------------------------------------------------------------------------------------------- Select 10 columns + count() 9961 / 10006 1.0 996.1 1.0X Select 1 column + count() 8355 / 8470 1.2 835.5 1.2X - count() 2104 / 2156 4.8 210.4 4.7X 24344 / 24642 0.0 24343.8 3.3X + count() 2104 / 2156 4.8 210.4 4.7X */ benchmark.run() } From 359c4fcbfdb4f4e77faa3977f381dc8e819e46fa Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 28 Jul 2018 18:23:44 +0200 Subject: [PATCH 06/22] Uncomment other benchmarks --- .../spark/sql/execution/datasources/csv/CSVBenchmarks.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala index 4d533f99bc27d..24f5f55d55485 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala @@ -158,8 +158,8 @@ object CSVBenchmarks { } def main(args: Array[String]): Unit = { -// quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3) -// multiColumnsBenchmark(rowsNum = 1000 * 1000) + quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3) + multiColumnsBenchmark(rowsNum = 1000 * 1000) countBenchmark(10 * 1000 * 1000) } } From 168eb993db4e847187e17c112ebbdccf7638d189 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 3 Aug 2018 17:51:21 +0200 Subject: [PATCH 07/22] A SQL config for bypassing parser in the case of empty schema --- .../apache/spark/sql/internal/SQLConf.scala | 10 +++++++ .../datasources/FailureSafeParser.scala | 6 ++++- .../datasources/json/JsonSuite.scala | 26 ++++++++++--------- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a269e218c4efd..42482312b07ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1449,6 +1449,14 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val BYPASS_PARSER_FOR_EMPTY_SCHEMA = buildConf("spark.sql.bypassParserForEmptySchema") + .doc("If required schema passed to a text datasource is empty, the parameter controls " + + "invocation of underlying parser. For example, if it is set to false, uniVocity parser " + + "is invoke by CSV datasource or Jackson parser by JSON datasource. By default, it is set " + + "to true which means the parsers is not invoked for empty required schema.") + .booleanConf + .createWithDefault(true) } /** @@ -1839,6 +1847,8 @@ class SQLConf extends Serializable with Logging { def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL) + def bypassParserForEmptySchema: Boolean = getConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala index 9d067332df148..73e7ffd77ae32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala @@ -18,9 +18,11 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.SparkException + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String @@ -57,7 +59,9 @@ class FailureSafeParser[IN]( } } - private val skipParsing = optimizeEmptySchema && schema.isEmpty + private val skipParsing = { + SQLConf.get.bypassParserForEmptySchema && optimizeEmptySchema && schema.isEmpty + } def parse(input: IN): Iterator[InternalRow] = { try { if (skipParsing) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 796088d2e724a..cc42cbb6f2b20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2225,19 +2225,21 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-23723: specified encoding is not matched to actual encoding") { - val fileName = "test-data/utf16LE.json" - val schema = new StructType().add("firstName", StringType).add("lastName", StringType) - val exception = intercept[SparkException] { - spark.read.schema(schema) - .option("mode", "FAILFAST") - .option("multiline", "true") - .options(Map("encoding" -> "UTF-16BE")) - .json(testFile(fileName)) - .collect() - } - val errMsg = exception.getMessage + withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> "false") { + val fileName = "test-data/utf16LE.json" + val schema = new StructType().add("firstName", StringType).add("lastName", StringType) + val exception = intercept[SparkException] { + spark.read.schema(schema) + .option("mode", "FAILFAST") + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16BE")) + .json(testFile(fileName)) + .count() + } + val errMsg = exception.getMessage - assert(errMsg.contains("Malformed records are detected in record parsing")) + assert(errMsg.contains("Malformed records are detected in record parsing")) + } } def checkEncoding(expectedEncoding: String, pathToJsonFiles: String, From 05c8dbb3f15e5a210758b66e0c6d47a519a01b65 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 3 Aug 2018 18:19:42 +0200 Subject: [PATCH 08/22] Making Scala style checker happy --- .../spark/sql/execution/datasources/FailureSafeParser.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala index 73e7ffd77ae32..5cc6c61dec019 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.SparkException - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.util._ From 4a8a2ebabbd52c3856e5e44685339bbfdd4d14d7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 5 Aug 2018 16:26:56 +0200 Subject: [PATCH 09/22] Put config to the legacy namespace --- .../org/apache/spark/sql/internal/SQLConf.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6668880b22895..e2087ae7da399 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1477,13 +1477,14 @@ object SQLConf { .booleanConf .createWithDefault(false) - val BYPASS_PARSER_FOR_EMPTY_SCHEMA = buildConf("spark.sql.bypassParserForEmptySchema") - .doc("If required schema passed to a text datasource is empty, the parameter controls " + - "invocation of underlying parser. For example, if it is set to false, uniVocity parser " + - "is invoke by CSV datasource or Jackson parser by JSON datasource. By default, it is set " + - "to true which means the parsers is not invoked for empty required schema.") - .booleanConf - .createWithDefault(true) + val BYPASS_PARSER_FOR_EMPTY_SCHEMA = + buildConf("spark.sql.legacy.bypassParserForEmptySchema") + .doc("If required schema passed to a text datasource is empty, the parameter controls " + + "invocation of underlying parser. For example, if it is set to false, uniVocity parser " + + "is invoke by CSV datasource or Jackson parser by JSON datasource. By default, it is set " + + "to true which means the parsers is not invoked for empty required schema.") + .booleanConf + .createWithDefault(true) } /** From 3f8fc5eb099ec7312efa8c541b0d03d2f1b0059b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 5 Aug 2018 16:45:41 +0200 Subject: [PATCH 10/22] Updating the migration guide --- docs/sql-programming-guide.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index a1e019cbec4d2..00b8efa2de563 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1892,6 +1892,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior. - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. + - Since Spark 2.4, text-based datasources like CSV and JSON don't parse input lines if the required schema pushed down to the datasources is empty. The schema can be empty in the case of count(), for example. To set `true` to `spark.sql.legacy.bypassParserForEmptySchema` restores the previous behavior when the underlying parser is always invoked even for the empty schema. This option will be removed in Spark 3.0. ## Upgrading From Spark SQL 2.3.0 to 2.3.1 and above From da16234db91035987aad10194a7466678306f063 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 14 Aug 2018 13:31:30 +0200 Subject: [PATCH 11/22] Revert unnecessary changes --- .../datasources/json/JsonSuite.scala | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index cc42cbb6f2b20..41d5e1d5f35b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2223,23 +2223,20 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) } - test("SPARK-23723: specified encoding is not matched to actual encoding") { - withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> "false") { - val fileName = "test-data/utf16LE.json" - val schema = new StructType().add("firstName", StringType).add("lastName", StringType) - val exception = intercept[SparkException] { - spark.read.schema(schema) - .option("mode", "FAILFAST") - .option("multiline", "true") - .options(Map("encoding" -> "UTF-16BE")) - .json(testFile(fileName)) - .count() - } - val errMsg = exception.getMessage - - assert(errMsg.contains("Malformed records are detected in record parsing")) + val fileName = "test-data/utf16LE.json" + val schema = new StructType().add("firstName", StringType).add("lastName", StringType) + val exception = intercept[SparkException] { + spark.read.schema(schema) + .option("mode", "FAILFAST") + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16BE")) + .json(testFile(fileName)) + .count() } + val errMsg = exception.getMessage + + assert(errMsg.contains("Malformed records are detected in record parsing")) } def checkEncoding(expectedEncoding: String, pathToJsonFiles: String, From 900bd0e25cabfcd4d0c0e4b3a8befdce2357dfe5 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 14 Aug 2018 14:07:37 +0200 Subject: [PATCH 12/22] Test for malformed JSON input --- .../datasources/json/JsonSuite.scala | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 41d5e1d5f35b6..1900efb82b881 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2489,4 +2489,33 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(exception.getMessage.contains("encoding must not be included in the blacklist")) } } + + test("count() for malformed input") { + def countForMalformedJSON(expected: Long, input: Seq[String]): Unit = { + val schema = new StructType().add("a", StringType) + val strings = spark.createDataset(input) + val df = spark.read.schema(schema).json(strings) + + assert(df.count() == expected) + } + def checkCount(expected: Long): Unit = { + val validRec = """{"a":"b"}""" + val inputs = Seq( + Seq("{-}", validRec), + Seq(validRec, "?"), + Seq("}", validRec), + Seq(validRec, """{"a": [1, 2, 3]}"""), + Seq("""{"a": {"a": "b"}}""", validRec) + ) + inputs.foreach { input => + countForMalformedJSON(expected, input) + } + } + Seq("true", "false").foreach { bypassParser => + withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> bypassParser) { + checkCount(2) + countForMalformedJSON(0, Seq("")) + } + } + } } From f5f13fa696eb888433e1fbc8a360353d032abc5f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 14 Aug 2018 14:15:40 +0200 Subject: [PATCH 13/22] Test for malformed CSV input --- .../execution/datasources/csv/CSVSuite.scala | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 456b4535a0dcc..8141e547be09a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1641,4 +1641,33 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } } } + + test("count() for malformed input") { + def countForMalformedCSV(expected: Long, input: Seq[String]): Unit = { + val schema = new StructType().add("a", IntegerType) + val strings = spark.createDataset(input) + val df = spark.read.schema(schema).option("header", false).csv(strings) + + assert(df.count() == expected) + } + def checkCount(expected: Long): Unit = { + val validRec = "1" + val inputs = Seq( + Seq("{-}", validRec), + Seq(validRec, "?"), + Seq("0xAC", validRec), + Seq(validRec, "0.314"), + Seq("\\\\\\", validRec) + ) + inputs.foreach { input => + countForMalformedCSV(expected, input) + } + } + Seq("true", "false").foreach { bypassParser => + withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> bypassParser) { + checkCount(2) + countForMalformedCSV(0, Seq("")) + } + } + } } From 12d50d03fbefaf5a86527418963bbc49654f4c87 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 15 Aug 2018 21:08:24 +0200 Subject: [PATCH 14/22] Handle errors caused by wrong input --- .../org/apache/spark/sql/catalyst/json/JacksonParser.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 6feea500b2aa0..984979ac5e9b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.json import java.io.{ByteArrayOutputStream, CharConversionException} +import java.nio.charset.MalformedInputException import scala.collection.mutable.ArrayBuffer import scala.util.Try @@ -402,7 +403,7 @@ class JacksonParser( } } } catch { - case e @ (_: RuntimeException | _: JsonProcessingException) => + case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) => // JSON parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. From 2f7405976329307c49644af13255f84af906db15 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 15 Aug 2018 21:09:26 +0200 Subject: [PATCH 15/22] Adding tests for count and wrong encoding for input json --- .../datasources/json/JsonSuite.scala | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 1900efb82b881..6094db5059019 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2224,19 +2224,30 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-23723: specified encoding is not matched to actual encoding") { - val fileName = "test-data/utf16LE.json" - val schema = new StructType().add("firstName", StringType).add("lastName", StringType) - val exception = intercept[SparkException] { - spark.read.schema(schema) - .option("mode", "FAILFAST") - .option("multiline", "true") - .options(Map("encoding" -> "UTF-16BE")) - .json(testFile(fileName)) - .count() + def doCount(bypassParser: Boolean, multiLine: Boolean): Long = { + var result: Long = -1 + withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> bypassParser.toString) { + val fileName = "test-data/utf16LE.json" + val schema = new StructType().add("firstName", StringType).add("lastName", StringType) + result = spark.read.schema(schema) + .option("mode", "FAILFAST") + .option("multiline", multiLine) + .options(Map("encoding" -> "UTF-16BE", "lineSep" -> "\n")) + .json(testFile(fileName)) + .count() + } + result + } + + Seq((true, true), (false, true), (false, false)).foreach { case (bypassParser, multiLine) => + val exception = intercept[SparkException] { + doCount(bypassParser, multiLine) + } + val errMsg = exception.getMessage + assert(errMsg.contains("Malformed records are detected in record parsing")) } - val errMsg = exception.getMessage - assert(errMsg.contains("Malformed records are detected in record parsing")) + assert(doCount(bypassParser = true, multiLine = false) == 5) } def checkEncoding(expectedEncoding: String, pathToJsonFiles: String, From 6b98f3edf19b6ca0887224c598d6f3fa88a762d1 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 15 Aug 2018 21:19:27 +0200 Subject: [PATCH 16/22] Migration guide is updated --- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 32ebe5dc3f73b..c5310ceba642a 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1894,7 +1894,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. - Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.parallelFileListingInStatsComputation.enabled` to `False`. - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation. - - Since Spark 2.4, text-based datasources like CSV and JSON don't parse input lines if the required schema pushed down to the datasources is empty. The schema can be empty in the case of count(), for example. To set `true` to `spark.sql.legacy.bypassParserForEmptySchema` restores the previous behavior when the underlying parser is always invoked even for the empty schema. This option will be removed in Spark 3.0. + - Since Spark 2.4, text-based datasources like CSV and JSON don't parse input lines if the required schema pushed down to the datasources is empty. The schema can be empty in the case of the count() action. For example, Spark 2.3 and earlier versions failed on JSON files with invalid encoding but Spark 2.4 returns total number of lines in the file. To restore the previous behavior when the underlying parser is always invoked even for the empty schema, set `true` to `spark.sql.legacy.bypassParserForEmptySchema`. This option will be removed in Spark 3.0. ## Upgrading From Spark SQL 2.3.0 to 2.3.1 and above From 6b34018fcedffa0033cb281d619af79e15d99585 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 16 Aug 2018 20:04:24 +0200 Subject: [PATCH 17/22] Fix a typo --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c493892524c7e..1606db3c2af3b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1497,8 +1497,8 @@ object SQLConf { buildConf("spark.sql.legacy.bypassParserForEmptySchema") .doc("If required schema passed to a text datasource is empty, the parameter controls " + "invocation of underlying parser. For example, if it is set to false, uniVocity parser " + - "is invoke by CSV datasource or Jackson parser by JSON datasource. By default, it is set " + - "to true which means the parsers is not invoked for empty required schema.") + "is invoked by CSV datasource or Jackson parser by JSON datasource. By default, " + + "it is set to true which means the parsers is not invoked for empty required schema.") .booleanConf .createWithDefault(true) } From 32404050a00a699a544cbb5e226149b5ddc1c5ec Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 17 Aug 2018 23:29:25 +0200 Subject: [PATCH 18/22] Skip parsing for the PERMISSIVE mode only --- .../spark/sql/execution/datasources/FailureSafeParser.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala index 5cc6c61dec019..e6793c5b1bd37 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala @@ -59,8 +59,10 @@ class FailureSafeParser[IN]( } private val skipParsing = { - SQLConf.get.bypassParserForEmptySchema && optimizeEmptySchema && schema.isEmpty + SQLConf.get.bypassParserForEmptySchema && optimizeEmptySchema && mode == PermissiveMode && + schema.isEmpty } + def parse(input: IN): Iterator[InternalRow] = { try { if (skipParsing) { From 2d8e754e699076c8a5915e7faf971e4bd2a5c1fd Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 17 Aug 2018 23:29:49 +0200 Subject: [PATCH 19/22] Revert test for invalid encoding --- .../datasources/json/JsonSuite.scala | 33 +++++++------------ 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 6094db5059019..1900efb82b881 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2224,30 +2224,19 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-23723: specified encoding is not matched to actual encoding") { - def doCount(bypassParser: Boolean, multiLine: Boolean): Long = { - var result: Long = -1 - withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> bypassParser.toString) { - val fileName = "test-data/utf16LE.json" - val schema = new StructType().add("firstName", StringType).add("lastName", StringType) - result = spark.read.schema(schema) - .option("mode", "FAILFAST") - .option("multiline", multiLine) - .options(Map("encoding" -> "UTF-16BE", "lineSep" -> "\n")) - .json(testFile(fileName)) - .count() - } - result - } - - Seq((true, true), (false, true), (false, false)).foreach { case (bypassParser, multiLine) => - val exception = intercept[SparkException] { - doCount(bypassParser, multiLine) - } - val errMsg = exception.getMessage - assert(errMsg.contains("Malformed records are detected in record parsing")) + val fileName = "test-data/utf16LE.json" + val schema = new StructType().add("firstName", StringType).add("lastName", StringType) + val exception = intercept[SparkException] { + spark.read.schema(schema) + .option("mode", "FAILFAST") + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16BE")) + .json(testFile(fileName)) + .count() } + val errMsg = exception.getMessage - assert(doCount(bypassParser = true, multiLine = false) == 5) + assert(errMsg.contains("Malformed records are detected in record parsing")) } def checkEncoding(expectedEncoding: String, pathToJsonFiles: String, From 96a94ccaed1f68fa7eaf3fc286540e531d9a9506 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 17 Aug 2018 23:35:53 +0200 Subject: [PATCH 20/22] Removing an unnecessary note in migration guide --- docs/sql-programming-guide.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index c5310ceba642a..d9ebc3cfe4674 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1894,7 +1894,6 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. - Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.parallelFileListingInStatsComputation.enabled` to `False`. - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation. - - Since Spark 2.4, text-based datasources like CSV and JSON don't parse input lines if the required schema pushed down to the datasources is empty. The schema can be empty in the case of the count() action. For example, Spark 2.3 and earlier versions failed on JSON files with invalid encoding but Spark 2.4 returns total number of lines in the file. To restore the previous behavior when the underlying parser is always invoked even for the empty schema, set `true` to `spark.sql.legacy.bypassParserForEmptySchema`. This option will be removed in Spark 3.0. ## Upgrading From Spark SQL 2.3.0 to 2.3.1 and above From 50a0ef025049557e1ec3ee1bf9fc79ed47625e18 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 18 Aug 2018 14:35:20 +0200 Subject: [PATCH 21/22] Removing the SQL config --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 11 ----------- .../sql/execution/datasources/FailureSafeParser.scala | 5 +---- .../sql/execution/datasources/csv/CSVSuite.scala | 9 +++------ .../sql/execution/datasources/json/JsonSuite.scala | 9 +++------ 4 files changed, 7 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1606db3c2af3b..dbb5bb43b4f1f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1492,15 +1492,6 @@ object SQLConf { "This usually speeds up commands that need to list many directories.") .booleanConf .createWithDefault(true) - - val BYPASS_PARSER_FOR_EMPTY_SCHEMA = - buildConf("spark.sql.legacy.bypassParserForEmptySchema") - .doc("If required schema passed to a text datasource is empty, the parameter controls " + - "invocation of underlying parser. For example, if it is set to false, uniVocity parser " + - "is invoked by CSV datasource or Jackson parser by JSON datasource. By default, " + - "it is set to true which means the parsers is not invoked for empty required schema.") - .booleanConf - .createWithDefault(true) } /** @@ -1903,8 +1894,6 @@ class SQLConf extends Serializable with Logging { def parallelFileListingInStatsComputation: Boolean = getConf(SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION) - def bypassParserForEmptySchema: Boolean = getConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA) - /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala index e6793c5b1bd37..d08794487ab84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala @@ -58,10 +58,7 @@ class FailureSafeParser[IN]( } } - private val skipParsing = { - SQLConf.get.bypassParserForEmptySchema && optimizeEmptySchema && mode == PermissiveMode && - schema.isEmpty - } + private val skipParsing = optimizeEmptySchema && mode == PermissiveMode && schema.isEmpty def parse(input: IN): Iterator[InternalRow] = { try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 8141e547be09a..14840e59a1052 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1663,11 +1663,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te countForMalformedCSV(expected, input) } } - Seq("true", "false").foreach { bypassParser => - withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> bypassParser) { - checkCount(2) - countForMalformedCSV(0, Seq("")) - } - } + + checkCount(2) + countForMalformedCSV(0, Seq("")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 1900efb82b881..3e4cc8f166279 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2511,11 +2511,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { countForMalformedJSON(expected, input) } } - Seq("true", "false").foreach { bypassParser => - withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> bypassParser) { - checkCount(2) - countForMalformedJSON(0, Seq("")) - } - } + + checkCount(2) + countForMalformedJSON(0, Seq("")) } } From 050c8ce73f35791c4adb1a4d11f120288865cae8 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 18 Aug 2018 14:53:11 +0200 Subject: [PATCH 22/22] Renaming optimizeEmptySchema to isMultiLine --- .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 4 ++-- .../spark/sql/execution/datasources/FailureSafeParser.scala | 4 ++-- .../spark/sql/execution/datasources/csv/UnivocityParser.scala | 4 ++-- .../spark/sql/execution/datasources/json/JsonDataSource.scala | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 27621afb454ff..1b3a9fc91d198 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -451,7 +451,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { parsedOptions.parseMode, schema, parsedOptions.columnNameOfCorruptRecord, - optimizeEmptySchema = true) + parsedOptions.multiLine) iter.flatMap(parser.parse) } sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming) @@ -523,7 +523,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { parsedOptions.parseMode, schema, parsedOptions.columnNameOfCorruptRecord, - optimizeEmptySchema = true) + parsedOptions.multiLine) iter.flatMap(parser.parse) } sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = csvDataset.isStreaming) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala index d08794487ab84..90e81661bae7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala @@ -30,7 +30,7 @@ class FailureSafeParser[IN]( mode: ParseMode, schema: StructType, columnNameOfCorruptRecord: String, - optimizeEmptySchema: Boolean) { + isMultiLine: Boolean) { private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) @@ -58,7 +58,7 @@ class FailureSafeParser[IN]( } } - private val skipParsing = optimizeEmptySchema && mode == PermissiveMode && schema.isEmpty + private val skipParsing = !isMultiLine && mode == PermissiveMode && schema.isEmpty def parse(input: IN): Iterator[InternalRow] = { try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 15c48c7b1b9f5..e15af425b2649 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -286,7 +286,7 @@ private[csv] object UnivocityParser { parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord, - optimizeEmptySchema = false) + parser.options.multiLine) convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens => safeParser.parse(tokens) }.flatten @@ -335,7 +335,7 @@ private[csv] object UnivocityParser { parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord, - optimizeEmptySchema = true) + parser.options.multiLine) filteredLines.flatMap(safeParser.parse) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 931938108343b..76f58371ae264 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -140,7 +140,7 @@ object TextInputJsonDataSource extends JsonDataSource { parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord, - optimizeEmptySchema = true) + parser.options.multiLine) linesReader.flatMap(safeParser.parse) } @@ -225,7 +225,7 @@ object MultiLineJsonDataSource extends JsonDataSource { parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord, - optimizeEmptySchema = false) + parser.options.multiLine) safeParser.parse( CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))))