From 31cb534d22cbab31fad5fc44115e67ef973420ea Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 4 Nov 2018 00:24:01 +0300 Subject: [PATCH 1/8] Eliminate producing nulls by JSON parser --- .../expressions/jsonExpressions.scala | 20 ++++++++++++------- .../sql/catalyst/json/JacksonParser.scala | 2 +- .../expressions/JsonExpressionsSuite.scala | 2 +- .../datasources/json/JsonSuite.scala | 12 ++++++++--- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index eafcb6161036e..61170c81d4c1c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -552,13 +552,19 @@ case class JsonToStructs( // This converts parsed rows to the desired output by the given schema. @transient - lazy val converter = nullableSchema match { - case _: StructType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null - case _: ArrayType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null - case _: MapType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null + lazy val converter = (rows: Iterator[InternalRow]) => { + if (rows.hasNext) { + val result = rows.next() + // JSON's parser produces one record only. + assert(!rows.hasNext) + nullableSchema match { + case _: StructType => result + case _: ArrayType => result.getArray(0) + case _: MapType => result.getMap(0) + } + } else { + throw new IllegalArgumentException("Expected one row from JSON parser.") + } } val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 57c7f2faf3107..773ff5a7a4013 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -399,7 +399,7 @@ class JacksonParser( // a null first token is equivalent to testing for input.trim.isEmpty // but it works on any token stream and not just strings parser.nextToken() match { - case null => Nil + case null => throw new RuntimeException("Not found any JSON token") case _ => rootConverter.apply(parser) match { case null => throw new RuntimeException("Root converter returned null") case rows => rows diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 304642161146b..5ff2857d0569f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -546,7 +546,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId), - null + InternalRow(null) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 06032ded42a53..9ea9189cdf7f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1115,6 +1115,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { Row(null, null, null), Row(null, null, null), Row(null, null, null), + Row(null, null, null), Row("str_a_4", "str_b_4", "str_c_4"), Row(null, null, null)) ) @@ -1136,6 +1137,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkAnswer( jsonDF.select($"a", $"b", $"c", $"_unparsed"), Row(null, null, null, "{") :: + Row(null, null, null, "") :: Row(null, null, null, """{"a":1, b:2}""") :: Row(null, null, null, """{"a":{, b:3}""") :: Row("str_a_4", "str_b_4", "str_c_4", null) :: @@ -1150,6 +1152,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkAnswer( jsonDF.filter($"_unparsed".isNotNull).select($"_unparsed"), Row("{") :: + Row("") :: Row("""{"a":1, b:2}""") :: Row("""{"a":{, b:3}""") :: Row("]") :: Nil @@ -1171,6 +1174,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkAnswer( jsonDF.selectExpr("a", "b", "c", "_malformed"), Row(null, null, null, "{") :: + Row(null, null, null, "") :: Row(null, null, null, """{"a":1, b:2}""") :: Row(null, null, null, """{"a":{, b:3}""") :: Row("str_a_4", "str_b_4", "str_c_4", null) :: @@ -1813,6 +1817,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val path = dir.getCanonicalPath primitiveFieldAndType .toDF("value") + .repartition(1) .write .option("compression", "GzIp") .text(path) @@ -1838,6 +1843,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val path = dir.getCanonicalPath primitiveFieldAndType .toDF("value") + .repartition(1) .write .text(path) @@ -1892,7 +1898,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .text(path) val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path) - assert(jsonDF.count() === corruptRecordCount) + assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file assert(jsonDF.schema === new StructType() .add("_corrupt_record", StringType) .add("dummy", StringType)) @@ -1905,7 +1911,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { F.count($"dummy").as("valid"), F.count($"_corrupt_record").as("corrupt"), F.count("*").as("count")) - checkAnswer(counts, Row(1, 4, 6)) + checkAnswer(counts, Row(1, 5, 7)) // null row for empty file } } @@ -2513,7 +2519,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } checkCount(2) - countForMalformedJSON(0, Seq("")) + countForMalformedJSON(1, Seq("")) } test("SPARK-25040: empty strings should be disallowed") { From 0589d9195ef396b2a94bb2dfdc3000ffc8e5555b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 4 Nov 2018 11:00:23 +0300 Subject: [PATCH 2/8] Updating the migration guide --- docs/sql-migration-guide-upgrade.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index c9685b866774f..dab3aa6528171 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -15,6 +15,8 @@ displayTitle: Spark SQL Upgrading Guide - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. + - In Spark version 2.4 and earlier, JSON data source and the `from_json` function produced `null`s if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`. + - The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set. ## Upgrading From Spark SQL 2.3 to 2.4 From 0aa4f609aa8c8e99121eb8ea2a966726e4846251 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 4 Nov 2018 14:47:16 +0300 Subject: [PATCH 3/8] Addressing Hyukjin's review comments --- .../sql/catalyst/expressions/jsonExpressions.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 61170c81d4c1c..414e8e762c8be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -550,6 +550,12 @@ case class JsonToStructs( s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } + private val castRow = nullableSchema match { + case _: StructType => (row: InternalRow) => row + case _: ArrayType => (row: InternalRow) => row.getArray(0) + case _: MapType => (row: InternalRow) => row.getMap(0) + } + // This converts parsed rows to the desired output by the given schema. @transient lazy val converter = (rows: Iterator[InternalRow]) => { @@ -557,11 +563,7 @@ case class JsonToStructs( val result = rows.next() // JSON's parser produces one record only. assert(!rows.hasNext) - nullableSchema match { - case _: StructType => result - case _: ArrayType => result.getArray(0) - case _: MapType => result.getMap(0) - } + castRow(result) } else { throw new IllegalArgumentException("Expected one row from JSON parser.") } From d1bad7cf94724072cfbdfc19b8a1f90e58848fb3 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 4 Nov 2018 15:04:59 +0300 Subject: [PATCH 4/8] Removing not relevant test --- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 631ab1b7ece7f..d343755edc922 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -240,16 +240,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Seq(Row("1"), Row("2"))) } - test("SPARK-11226 Skip empty line in json file") { - spark.read - .json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "").toDS()) - .createOrReplaceTempView("d") - - checkAnswer( - sql("select count(1) from d"), - Seq(Row(3))) - } - test("SPARK-8828 sum should return null if all input values are null") { checkAnswer( sql("select sum(a), avg(a) from allNulls"), From c4d6a8066031c4f1b0f9323f9998f0f0b10b74c7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 4 Nov 2018 19:41:35 +0300 Subject: [PATCH 5/8] fix for R test --- R/pkg/tests/fulltests/test_sparkSQL.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 58e0a54d2aacc..c0beb7691beb2 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1674,7 +1674,7 @@ test_that("column functions", { # check for unparseable df <- as.DataFrame(list(list("a" = ""))) - expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA) + expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]]$a, NA) # check if array type in string is correctly supported. jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]" From a7e016acf1e8495f5a89f752d649a4f1a35b9eb1 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 6 Nov 2018 22:01:44 +0300 Subject: [PATCH 6/8] Addressing Attila's review comments --- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 414e8e762c8be..a0fa7d92a2d83 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -550,15 +550,15 @@ case class JsonToStructs( s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } - private val castRow = nullableSchema match { + @transient + private lazy val castRow = nullableSchema match { case _: StructType => (row: InternalRow) => row case _: ArrayType => (row: InternalRow) => row.getArray(0) case _: MapType => (row: InternalRow) => row.getMap(0) } // This converts parsed rows to the desired output by the given schema. - @transient - lazy val converter = (rows: Iterator[InternalRow]) => { + private def convertRow(rows: Iterator[InternalRow]) = { if (rows.hasNext) { val result = rows.next() // JSON's parser produces one record only. @@ -599,7 +599,7 @@ case class JsonToStructs( copy(timeZoneId = Option(timeZoneId)) override def nullSafeEval(json: Any): Any = { - converter(parser.parse(json.asInstanceOf[UTF8String])) + convertRow(parser.parse(json.asInstanceOf[UTF8String])) } override def inputTypes: Seq[AbstractDataType] = StringType :: Nil From 6a8cac3c908fa667aa443fdb9c68c476a22d0259 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 21 Nov 2018 21:30:55 +0100 Subject: [PATCH 7/8] Updating the migration guide --- docs/sql-migration-guide-upgrade.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index b423c2b6f3020..bbe5049b190fc 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -15,7 +15,7 @@ displayTitle: Spark SQL Upgrading Guide - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. - - In Spark version 2.4 and earlier, JSON data source and the `from_json` function produced `null`s if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`. + - In Spark version 2.4 and earlier, the `from_json` function produces `null`s if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`. - The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set. From 1ef2d5beeb9259019be4088cc05eb4821ca4f7c7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 21 Nov 2018 22:47:50 +0100 Subject: [PATCH 8/8] Taking into account JSON datasource behavior --- docs/sql-migration-guide-upgrade.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index bbe5049b190fc..4e8858cea4eda 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -15,7 +15,7 @@ displayTitle: Spark SQL Upgrading Guide - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. - - In Spark version 2.4 and earlier, the `from_json` function produces `null`s if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`. + - In Spark version 2.4 and earlier, the `from_json` function produces `null`s for JSON strings and JSON datasource skips the same independetly of its mode if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`. - The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set.