From 506417ed4b643298560a66c043f7b31beb489da3 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 10 Nov 2018 20:49:12 +0100 Subject: [PATCH 01/12] Test for parsing decimals using locale --- .../expressions/JsonExpressionsSuite.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 6ee8c74010d3d..e8bdc06b117e5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.expressions -import java.text.SimpleDateFormat +import java.text.{DecimalFormat, DecimalFormatSymbols, SimpleDateFormat} import java.util.{Calendar, Locale} import org.scalatest.exceptions.TestFailedException @@ -754,4 +754,17 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with InternalRow(17836)) // number of days from 1970-01-01 } } + + test("parse decimals using locale") { + Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach { langTag => + val schema = new StructType().add("d", DecimalType(10, 5)) + val options = Map("locale" -> langTag) + val expected = Decimal(1000.001, 10, 5) + val df = new DecimalFormat("", new DecimalFormatSymbols(Locale.forLanguageTag(langTag))) + val input = s"""{"d":"${df.format(expected.toBigDecimal)}"}""" + checkEvaluation( + JsonToStructs(schema, options, Literal.create(input), gmtId), + InternalRow(expected)) + } + } } From ac25fb6ed1d3d6689ad8841476c025848c87f2a3 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 10 Nov 2018 20:51:48 +0100 Subject: [PATCH 02/12] Parsing decimals using locale --- .../apache/spark/sql/catalyst/json/JacksonParser.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 57c7f2faf3107..10e2d230b019b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.json import java.io.{ByteArrayOutputStream, CharConversionException} import java.nio.charset.MalformedInputException +import java.text.{DecimalFormat, DecimalFormatSymbols} import scala.collection.mutable.ArrayBuffer import scala.util.Try @@ -135,6 +136,12 @@ class JacksonParser( } } + private val decimalParser = { + val df = new DecimalFormat("", new DecimalFormatSymbols(options.locale)) + df.setParseBigDecimal(true) + df + } + /** * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. @@ -261,6 +268,9 @@ class JacksonParser( (parser: JsonParser) => parseJsonToken[Decimal](parser, dataType) { case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) => Decimal(parser.getDecimalValue, dt.precision, dt.scale) + case VALUE_STRING => + val bigDecimal = decimalParser.parse(parser.getText).asInstanceOf[java.math.BigDecimal] + Decimal(bigDecimal, dt.precision, dt.scale) } case st: StructType => From b784003078270a46aaf8aceb2d86dd9f13f3500c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 10 Nov 2018 20:54:00 +0100 Subject: [PATCH 03/12] Updating the migration guide --- docs/sql-migration-guide-upgrade.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 50458e96f7c3f..2eb544ea07737 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -9,6 +9,8 @@ displayTitle: Spark SQL Upgrading Guide ## Upgrading From Spark SQL 2.4 to 3.0 + - Since Spark 3.0, to parse decimals in locale specific format from JSON, set the `locale` option to proper value. + - In PySpark, when creating a `SparkSession` with `SparkSession.builder.getOrCreate()`, if there is an existing `SparkContext`, the builder was trying to update the `SparkConf` of the existing `SparkContext` with configurations specified to the builder, but the `SparkContext` is shared by all `SparkSession`s, so we should not update them. Since 3.0, the builder comes to not update the configurations. This is the same behavior as Java/Scala API in 2.3 and above. If you want to update them, you need to update them prior to creating a `SparkSession`. - In Spark version 2.4 and earlier, the parser of JSON data source treats empty strings as null for some data types such as `IntegerType`. For `FloatType` and `DoubleType`, it fails on empty strings and throws exceptions. Since Spark 3.0, we disallow empty strings and will throw exceptions for data types except for `StringType` and `BinaryType`. From 722e135cf3b94e22079fd9a0fe1d309a04e76a64 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 20:58:39 +0100 Subject: [PATCH 04/12] Add SQL config spark.sql.legacy.decimalParsing.enabled --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7bcf21595ce5a..d789fec9d5755 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1610,6 +1610,13 @@ object SQLConf { """ "... N more fields" placeholder.""") .intConf .createWithDefault(25) + + val LEGACY_DECIMAL_PARSING_ENABLED = buildConf("spark.sql.legacy.decimalParsing.enabled") + .doc("If it is set to false, it enables parsing decimals in locale specific formats. " + + "To switch back to previous behaviour when parsing was performed by java.math.BigDecimal " + + "and all commas were removed from the input, set the flag to true.") + .booleanConf + .createWithDefault(false) } /** @@ -2030,6 +2037,8 @@ class SQLConf extends Serializable with Logging { def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS) + def legacyDecimalParsing: Boolean = getConf(SQLConf.LEGACY_DECIMAL_PARSING_ENABLED) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ From dc6c0ac0a97b31441d99ff1dd71608ae5e2eca73 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 21:00:31 +0100 Subject: [PATCH 05/12] Updating the migration guide --- docs/sql-migration-guide-upgrade.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 2681d0d25e352..1c04b6fada9f8 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -9,7 +9,7 @@ displayTitle: Spark SQL Upgrading Guide ## Upgrading From Spark SQL 2.4 to 3.0 - - Since Spark 3.0, to parse decimals in locale specific format from JSON, set the `locale` option to proper value. + - In Spark version 2.4 and earlier, accepted format of decimals parsed from JSON is an optional sign ('+' or '-'), followed by a sequence of zero or more decimal digits, optionally followed by a fraction, optionally followed by an exponent. Any commas were removed from the input before parsing. Since Spark 3.0, format varies and depends on locale which can be set via JSON option `locale`. The default locale is `en-US`. To switch back to previous behavior, set `spark.sql.legacy.decimalParsing.enabled` to `true`. - In PySpark, when creating a `SparkSession` with `SparkSession.builder.getOrCreate()`, if there is an existing `SparkContext`, the builder was trying to update the `SparkConf` of the existing `SparkContext` with configurations specified to the builder, but the `SparkContext` is shared by all `SparkSession`s, so we should not update them. Since 3.0, the builder comes to not update the configurations. This is the same behavior as Java/Scala API in 2.3 and above. If you want to update them, you need to update them prior to creating a `SparkSession`. From f15b1817fb51d453487665122473855712214692 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 21:38:24 +0100 Subject: [PATCH 06/12] Added a test for parsing --- .../expressions/JsonExpressionsSuite.scala | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index c54ccda3087f6..799afa55ddd55 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -767,15 +767,35 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with } test("parse decimals using locale") { - Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach { langTag => + def checkDecimalParsing(langTag: String): Unit = { + val decimalVal = new java.math.BigDecimal("1000.001") + val decimalType = new DecimalType(10, 5) + val expected = Decimal(decimalVal, decimalType.precision, decimalType.scale) + val decimalFormat = new DecimalFormat("", + new DecimalFormatSymbols(Locale.forLanguageTag(langTag))) + val input = s"""{"d": "${decimalFormat.format(expected.toBigDecimal)}"}""" val schema = new StructType().add("d", DecimalType(10, 5)) val options = Map("locale" -> langTag) - val expected = Decimal(1000.001, 10, 5) - val df = new DecimalFormat("", new DecimalFormatSymbols(Locale.forLanguageTag(langTag))) - val input = s"""{"d":"${df.format(expected.toBigDecimal)}"}""" + checkEvaluation( JsonToStructs(schema, options, Literal.create(input), gmtId), InternalRow(expected)) } + + withSQLConf(SQLConf.LEGACY_DECIMAL_PARSING_ENABLED.key -> "false") { + Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach(checkDecimalParsing) + } + + withSQLConf(SQLConf.LEGACY_DECIMAL_PARSING_ENABLED.key -> "true") { + Seq("en-US", "ko-KR").foreach(checkDecimalParsing) + } + + withSQLConf(SQLConf.LEGACY_DECIMAL_PARSING_ENABLED.key -> "true") { + Seq("ru-RU").foreach { langTag => + intercept[NumberFormatException] { + checkDecimalParsing(langTag) + } + } + } } } From ab781d54e4f7604d64c72c3c383c549abab0a9a9 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 21:52:03 +0100 Subject: [PATCH 07/12] Fix test --- .../spark/sql/catalyst/expressions/JsonExpressionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 799afa55ddd55..1ca242c2fbd6c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -792,7 +792,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with withSQLConf(SQLConf.LEGACY_DECIMAL_PARSING_ENABLED.key -> "true") { Seq("ru-RU").foreach { langTag => - intercept[NumberFormatException] { + intercept[TestFailedException] { checkDecimalParsing(langTag) } } From 163a8b9d7d017409ae4dfa40e492680bf0e4f935 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 21:52:26 +0100 Subject: [PATCH 08/12] Create getDecimalParser --- .../sql/catalyst/expressions/ExprUtils.scala | 21 +++++++++++++++++++ .../sql/catalyst/json/JacksonParser.scala | 8 +++---- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala index 89e9071324eff..8d9d40290e606 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.catalyst.expressions +import java.text.{DecimalFormat, DecimalFormatSymbols, ParsePosition} +import java.util.Locale + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.util.ArrayBasedMapData import org.apache.spark.sql.types.{DataType, MapType, StringType, StructType} @@ -83,4 +86,22 @@ object ExprUtils { } } } + + def getDecimalParser(useLegacyParser: Boolean, locale: Locale): String => java.math.BigDecimal = { + if (useLegacyParser) { + (s: String) => new java.math.BigDecimal(s.replaceAll(",", "")) + } else { + val decimalFormat = new DecimalFormat("", new DecimalFormatSymbols(locale)) + decimalFormat.setParseBigDecimal(true) + (s: String) => { + val pos = new ParsePosition(0) + val result = decimalFormat.parse(s, pos).asInstanceOf[java.math.BigDecimal] + if (pos.getIndex() != s.length() || pos.getErrorIndex() != -1) { + throw new IllegalArgumentException("Cannot parse any decimal"); + } else { + result + } + } + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index bcb839c87f00b..0e662b759caad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.json import java.io.{ByteArrayOutputStream, CharConversionException} import java.nio.charset.MalformedInputException -import java.text.{DecimalFormat, DecimalFormatSymbols} import scala.collection.mutable.ArrayBuffer import scala.util.Try @@ -30,6 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -137,9 +137,7 @@ class JacksonParser( } private val decimalParser = { - val df = new DecimalFormat("", new DecimalFormatSymbols(options.locale)) - df.setParseBigDecimal(true) - df + ExprUtils.getDecimalParser(SQLConf.get.legacyDecimalParsing, options.locale) } /** @@ -269,7 +267,7 @@ class JacksonParser( case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) => Decimal(parser.getDecimalValue, dt.precision, dt.scale) case VALUE_STRING => - val bigDecimal = decimalParser.parse(parser.getText).asInstanceOf[java.math.BigDecimal] + val bigDecimal = decimalParser(parser.getText) Decimal(bigDecimal, dt.precision, dt.scale) } From 8fb65c0db85f4bd2f76d473c5e31e772ff0d4c1d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 23:11:42 +0100 Subject: [PATCH 09/12] Add a test for inferring decimals --- .../expressions/JsonExpressionsSuite.scala | 47 ++++++++++++++++--- 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 1ca242c2fbd6c..931a72ca80534 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -766,16 +766,22 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with expectedErrMsg = "The field for corrupt records must be string type and nullable") } + def decimalInput(langTag: String): (Decimal, String) = { + val decimalVal = new java.math.BigDecimal("1000.001") + val decimalType = new DecimalType(10, 5) + val expected = Decimal(decimalVal, decimalType.precision, decimalType.scale) + val decimalFormat = new DecimalFormat("", + new DecimalFormatSymbols(Locale.forLanguageTag(langTag))) + val input = s"""{"d": "${decimalFormat.format(expected.toBigDecimal)}"}""" + + (expected, input) + } + test("parse decimals using locale") { def checkDecimalParsing(langTag: String): Unit = { - val decimalVal = new java.math.BigDecimal("1000.001") - val decimalType = new DecimalType(10, 5) - val expected = Decimal(decimalVal, decimalType.precision, decimalType.scale) - val decimalFormat = new DecimalFormat("", - new DecimalFormatSymbols(Locale.forLanguageTag(langTag))) - val input = s"""{"d": "${decimalFormat.format(expected.toBigDecimal)}"}""" val schema = new StructType().add("d", DecimalType(10, 5)) val options = Map("locale" -> langTag) + val (expected, input) = decimalInput(langTag) checkEvaluation( JsonToStructs(schema, options, Literal.create(input), gmtId), @@ -798,4 +804,33 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with } } } + + test("inferring the decimal type using locale") { + def checkDecimalInfer(langTag: String, expectedType: String): Unit = { + val options = Map("locale" -> langTag, "prefersDecimal" -> "true") + val (_, input) = decimalInput(langTag) + + checkEvaluation( + SchemaOfJson(Literal.create(input), options), + expectedType) + } + + withSQLConf(SQLConf.LEGACY_DECIMAL_PARSING_ENABLED.key -> "false") { + Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach { + checkDecimalInfer(_, """struct""") + } + } + + withSQLConf(SQLConf.LEGACY_DECIMAL_PARSING_ENABLED.key -> "true") { + Seq("en-US", "ko-KR").foreach { + checkDecimalInfer(_, """struct""") + } + } + + withSQLConf(SQLConf.LEGACY_DECIMAL_PARSING_ENABLED.key -> "true") { + Seq("ru-RU").foreach { + checkDecimalInfer(_, """struct""") + } + } + } } From 7e3a2906a96894cadc58771131d07d06ba265382 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 23:12:35 +0100 Subject: [PATCH 10/12] Change JsonSuite to adopt it for JsonInferSchema class --- .../execution/datasources/json/JsonSuite.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 9ea9189cdf7f4..ee31077e12ef3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -31,8 +31,7 @@ import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.{SparkException, TestUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{functions => F, _} -import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JsonInferSchema, JSONOptions} -import org.apache.spark.sql.catalyst.json.JsonInferSchema.compatibleType +import org.apache.spark.sql.catalyst.json._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.ExternalRDD import org.apache.spark.sql.execution.datasources.DataSource @@ -118,10 +117,10 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("Get compatible type") { def checkDataType(t1: DataType, t2: DataType, expected: DataType) { - var actual = compatibleType(t1, t2) + var actual = JsonInferSchema.compatibleType(t1, t2) assert(actual == expected, s"Expected $expected as the most general data type for $t1 and $t2, found $actual") - actual = compatibleType(t2, t1) + actual = JsonInferSchema.compatibleType(t2, t1) assert(actual == expected, s"Expected $expected as the most general data type for $t1 and $t2, found $actual") } @@ -1373,9 +1372,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-6245 JsonInferSchema.infer on empty RDD") { // This is really a test that it doesn't throw an exception - val emptySchema = JsonInferSchema.infer( + val options = new JSONOptions(Map.empty[String, String], "GMT") + val emptySchema = new JsonInferSchema(options).infer( empty.rdd, - new JSONOptions(Map.empty[String, String], "GMT"), CreateJacksonParser.string) assert(StructType(Seq()) === emptySchema) } @@ -1400,9 +1399,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-8093 Erase empty structs") { - val emptySchema = JsonInferSchema.infer( + val options = new JSONOptions(Map.empty[String, String], "GMT") + val emptySchema = new JsonInferSchema(options).infer( emptyRecords.rdd, - new JSONOptions(Map.empty[String, String], "GMT"), CreateJacksonParser.string) assert(StructType(Seq()) === emptySchema) } From 83920b25f586dc242841ff0a73105ae9e43295ed Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 23:13:01 +0100 Subject: [PATCH 11/12] Inferring decimals from JSON --- .../expressions/jsonExpressions.scala | 7 +- .../sql/catalyst/json/JacksonParser.scala | 2 +- .../sql/catalyst/json/JsonInferSchema.scala | 91 +++++++++++-------- .../datasources/json/JsonDataSource.scala | 4 +- 4 files changed, 60 insertions(+), 44 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 47304d835fdf8..e0cab537ce1c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -23,12 +23,10 @@ import scala.util.parsing.combinator.RegexParsers import com.fasterxml.jackson.core._ -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.json._ -import org.apache.spark.sql.catalyst.json.JsonInferSchema.inferField import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -775,6 +773,9 @@ case class SchemaOfJson( factory } + @transient + private lazy val jsonInferSchema = new JsonInferSchema(jsonOptions) + @transient private lazy val json = child.eval().asInstanceOf[UTF8String] @@ -787,7 +788,7 @@ case class SchemaOfJson( override def eval(v: InternalRow): Any = { val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser => parser.nextToken() - inferField(parser, jsonOptions) + jsonInferSchema.inferField(parser) } UTF8String.fromString(dt.catalogString) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 0e662b759caad..c9794a1a5d78a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -266,7 +266,7 @@ class JacksonParser( (parser: JsonParser) => parseJsonToken[Decimal](parser, dataType) { case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) => Decimal(parser.getDecimalValue, dt.precision, dt.scale) - case VALUE_STRING => + case VALUE_STRING if parser.getTextLength >= 1 => val bigDecimal = decimalParser(parser.getText) Decimal(bigDecimal, dt.precision, dt.scale) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 9999a005106f9..ca1d0146c156e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -19,18 +19,25 @@ package org.apache.spark.sql.catalyst.json import java.util.Comparator +import scala.util.control.Exception.allCatch + import com.fasterxml.jackson.core._ import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion +import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, ParseMode, PermissiveMode} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -private[sql] object JsonInferSchema { +private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { + + private val decimalParser = { + ExprUtils.getDecimalParser(SQLConf.get.legacyDecimalParsing, options.locale) + } /** * Infer the type of a collection of json records in three stages: @@ -40,21 +47,20 @@ private[sql] object JsonInferSchema { */ def infer[T]( json: RDD[T], - configOptions: JSONOptions, createParser: (JsonFactory, T) => JsonParser): StructType = { - val parseMode = configOptions.parseMode - val columnNameOfCorruptRecord = configOptions.columnNameOfCorruptRecord + val parseMode = options.parseMode + val columnNameOfCorruptRecord = options.columnNameOfCorruptRecord // In each RDD partition, perform schema inference on each row and merge afterwards. - val typeMerger = compatibleRootType(columnNameOfCorruptRecord, parseMode) + val typeMerger = JsonInferSchema.compatibleRootType(columnNameOfCorruptRecord, parseMode) val mergedTypesFromPartitions = json.mapPartitions { iter => val factory = new JsonFactory() - configOptions.setJacksonOptions(factory) + options.setJacksonOptions(factory) iter.flatMap { row => try { Utils.tryWithResource(createParser(factory, row)) { parser => parser.nextToken() - Some(inferField(parser, configOptions)) + Some(inferField(parser)) } } catch { case e @ (_: RuntimeException | _: JsonProcessingException) => parseMode match { @@ -82,7 +88,7 @@ private[sql] object JsonInferSchema { } json.sparkContext.runJob(mergedTypesFromPartitions, foldPartition, mergeResult) - canonicalizeType(rootType, configOptions) match { + canonicalizeType(rootType, options) match { case Some(st: StructType) => st case _ => // canonicalizeType erases all empty structs, including the only one we want to keep @@ -90,34 +96,17 @@ private[sql] object JsonInferSchema { } } - private[this] val structFieldComparator = new Comparator[StructField] { - override def compare(o1: StructField, o2: StructField): Int = { - o1.name.compareTo(o2.name) - } - } - - private def isSorted(arr: Array[StructField]): Boolean = { - var i: Int = 0 - while (i < arr.length - 1) { - if (structFieldComparator.compare(arr(i), arr(i + 1)) > 0) { - return false - } - i += 1 - } - true - } - /** * Infer the type of a json document from the parser's token stream */ - def inferField(parser: JsonParser, configOptions: JSONOptions): DataType = { + def inferField(parser: JsonParser): DataType = { import com.fasterxml.jackson.core.JsonToken._ parser.getCurrentToken match { case null | VALUE_NULL => NullType case FIELD_NAME => parser.nextToken() - inferField(parser, configOptions) + inferField(parser) case VALUE_STRING if parser.getTextLength < 1 => // Zero length strings and nulls have special handling to deal @@ -128,18 +117,25 @@ private[sql] object JsonInferSchema { // record fields' types have been combined. NullType + case VALUE_STRING if options.prefersDecimal => + val decimalTry = allCatch opt { + val bigDecimal = decimalParser(parser.getText) + DecimalType(bigDecimal.precision, bigDecimal.scale) + } + decimalTry.getOrElse(StringType) case VALUE_STRING => StringType + case START_OBJECT => val builder = Array.newBuilder[StructField] while (nextUntil(parser, END_OBJECT)) { builder += StructField( parser.getCurrentName, - inferField(parser, configOptions), + inferField(parser), nullable = true) } val fields: Array[StructField] = builder.result() // Note: other code relies on this sorting for correctness, so don't remove it! - java.util.Arrays.sort(fields, structFieldComparator) + java.util.Arrays.sort(fields, JsonInferSchema.structFieldComparator) StructType(fields) case START_ARRAY => @@ -148,15 +144,15 @@ private[sql] object JsonInferSchema { // the type as we pass through all JSON objects. var elementType: DataType = NullType while (nextUntil(parser, END_ARRAY)) { - elementType = compatibleType( - elementType, inferField(parser, configOptions)) + elementType = JsonInferSchema.compatibleType( + elementType, inferField(parser)) } ArrayType(elementType) - case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) if configOptions.primitivesAsString => StringType + case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) if options.primitivesAsString => StringType - case (VALUE_TRUE | VALUE_FALSE) if configOptions.primitivesAsString => StringType + case (VALUE_TRUE | VALUE_FALSE) if options.primitivesAsString => StringType case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => import JsonParser.NumberType._ @@ -172,7 +168,7 @@ private[sql] object JsonInferSchema { } else { DoubleType } - case FLOAT | DOUBLE if configOptions.prefersDecimal => + case FLOAT | DOUBLE if options.prefersDecimal => val v = parser.getDecimalValue if (Math.max(v.precision(), v.scale()) <= DecimalType.MAX_PRECISION) { DecimalType(Math.max(v.precision(), v.scale()), v.scale()) @@ -217,12 +213,31 @@ private[sql] object JsonInferSchema { case other => Some(other) } +} + +object JsonInferSchema { + val structFieldComparator = new Comparator[StructField] { + override def compare(o1: StructField, o2: StructField): Int = { + o1.name.compareTo(o2.name) + } + } + + def isSorted(arr: Array[StructField]): Boolean = { + var i: Int = 0 + while (i < arr.length - 1) { + if (structFieldComparator.compare(arr(i), arr(i + 1)) > 0) { + return false + } + i += 1 + } + true + } - private def withCorruptField( + def withCorruptField( struct: StructType, other: DataType, columnNameOfCorruptRecords: String, - parseMode: ParseMode) = parseMode match { + parseMode: ParseMode): StructType = parseMode match { case PermissiveMode => // If we see any other data type at the root level, we get records that cannot be // parsed. So, we use the struct as the data type and add the corrupt field to the schema. @@ -230,7 +245,7 @@ private[sql] object JsonInferSchema { // If this given struct does not have a column used for corrupt records, // add this field. val newFields: Array[StructField] = - StructField(columnNameOfCorruptRecords, StringType, nullable = true) +: struct.fields + StructField(columnNameOfCorruptRecords, StringType, nullable = true) +: struct.fields // Note: other code relies on this sorting for correctness, so don't remove it! java.util.Arrays.sort(newFields, structFieldComparator) StructType(newFields) @@ -253,7 +268,7 @@ private[sql] object JsonInferSchema { /** * Remove top-level ArrayType wrappers and merge the remaining schemas */ - private def compatibleRootType( + def compatibleRootType( columnNameOfCorruptRecords: String, parseMode: ParseMode): (DataType, DataType) => DataType = { // Since we support array of json objects at the top level, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index c7608e2e881ff..456f08a2a2ee7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -107,7 +107,7 @@ object TextInputJsonDataSource extends JsonDataSource { }.getOrElse(CreateJacksonParser.internalRow(_: JsonFactory, _: InternalRow)) SQLExecution.withSQLConfPropagated(json.sparkSession) { - JsonInferSchema.infer(rdd, parsedOptions, rowParser) + new JsonInferSchema(parsedOptions).infer(rdd, rowParser) } } @@ -166,7 +166,7 @@ object MultiLineJsonDataSource extends JsonDataSource { .getOrElse(createParser(_: JsonFactory, _: PortableDataStream)) SQLExecution.withSQLConfPropagated(sparkSession) { - JsonInferSchema.infer[PortableDataStream](sampled, parsedOptions, parser) + new JsonInferSchema(parsedOptions).infer[PortableDataStream](sampled, parser) } } From 1ec56e53ec7dfc79d4714f55a1166f3b66e6f03b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 28 Nov 2018 20:56:51 +0100 Subject: [PATCH 12/12] Addressing Wenchen's review comments --- docs/sql-migration-guide-upgrade.md | 2 -- .../sql/catalyst/expressions/ExprUtils.scala | 4 +-- .../sql/catalyst/json/JacksonParser.scala | 4 +-- .../sql/catalyst/json/JsonInferSchema.scala | 4 +-- .../apache/spark/sql/internal/SQLConf.scala | 9 ------ .../expressions/JsonExpressionsSuite.scala | 32 ++----------------- 6 files changed, 6 insertions(+), 49 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 7acf99102c2b9..68cb8f5a0d18c 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -9,8 +9,6 @@ displayTitle: Spark SQL Upgrading Guide ## Upgrading From Spark SQL 2.4 to 3.0 - - In Spark version 2.4 and earlier, accepted format of decimals parsed from JSON is an optional sign ('+' or '-'), followed by a sequence of zero or more decimal digits, optionally followed by a fraction, optionally followed by an exponent. Any commas were removed from the input before parsing. Since Spark 3.0, format varies and depends on locale which can be set via JSON option `locale`. The default locale is `en-US`. To switch back to previous behavior, set `spark.sql.legacy.decimalParsing.enabled` to `true`. - - Since Spark 3.0, the Dataset and DataFrame API `unionAll` is not deprecated any more. It is an alias for `union`. - In PySpark, when creating a `SparkSession` with `SparkSession.builder.getOrCreate()`, if there is an existing `SparkContext`, the builder was trying to update the `SparkConf` of the existing `SparkContext` with configurations specified to the builder, but the `SparkContext` is shared by all `SparkSession`s, so we should not update them. Since 3.0, the builder comes to not update the configurations. This is the same behavior as Java/Scala API in 2.3 and above. If you want to update them, you need to update them prior to creating a `SparkSession`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala index 8d9d40290e606..3f3d6b2b63a06 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala @@ -87,8 +87,8 @@ object ExprUtils { } } - def getDecimalParser(useLegacyParser: Boolean, locale: Locale): String => java.math.BigDecimal = { - if (useLegacyParser) { + def getDecimalParser(locale: Locale): String => java.math.BigDecimal = { + if (locale == Locale.US) { // Special handling the default locale for backward compatibility (s: String) => new java.math.BigDecimal(s.replaceAll(",", "")) } else { val decimalFormat = new DecimalFormat("", new DecimalFormatSymbols(locale)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index c9794a1a5d78a..df8044ea71e81 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -136,9 +136,7 @@ class JacksonParser( } } - private val decimalParser = { - ExprUtils.getDecimalParser(SQLConf.get.legacyDecimalParsing, options.locale) - } + private val decimalParser = ExprUtils.getDecimalParser(options.locale) /** * Create a converter which converts the JSON documents held by the `JsonParser` diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index ca1d0146c156e..263e05de32075 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -35,9 +35,7 @@ import org.apache.spark.util.Utils private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { - private val decimalParser = { - ExprUtils.getDecimalParser(SQLConf.get.legacyDecimalParsing, options.locale) - } + private val decimalParser = ExprUtils.getDecimalParser(options.locale) /** * Infer the type of a collection of json records in three stages: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d789fec9d5755..7bcf21595ce5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1610,13 +1610,6 @@ object SQLConf { """ "... N more fields" placeholder.""") .intConf .createWithDefault(25) - - val LEGACY_DECIMAL_PARSING_ENABLED = buildConf("spark.sql.legacy.decimalParsing.enabled") - .doc("If it is set to false, it enables parsing decimals in locale specific formats. " + - "To switch back to previous behaviour when parsing was performed by java.math.BigDecimal " + - "and all commas were removed from the input, set the flag to true.") - .booleanConf - .createWithDefault(false) } /** @@ -2037,8 +2030,6 @@ class SQLConf extends Serializable with Logging { def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS) - def legacyDecimalParsing: Boolean = getConf(SQLConf.LEGACY_DECIMAL_PARSING_ENABLED) - /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 931a72ca80534..5d60cefc13896 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -788,21 +788,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with InternalRow(expected)) } - withSQLConf(SQLConf.LEGACY_DECIMAL_PARSING_ENABLED.key -> "false") { - Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach(checkDecimalParsing) - } - - withSQLConf(SQLConf.LEGACY_DECIMAL_PARSING_ENABLED.key -> "true") { - Seq("en-US", "ko-KR").foreach(checkDecimalParsing) - } - - withSQLConf(SQLConf.LEGACY_DECIMAL_PARSING_ENABLED.key -> "true") { - Seq("ru-RU").foreach { langTag => - intercept[TestFailedException] { - checkDecimalParsing(langTag) - } - } - } + Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach(checkDecimalParsing) } test("inferring the decimal type using locale") { @@ -815,22 +801,8 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with expectedType) } - withSQLConf(SQLConf.LEGACY_DECIMAL_PARSING_ENABLED.key -> "false") { - Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach { - checkDecimalInfer(_, """struct""") - } - } - - withSQLConf(SQLConf.LEGACY_DECIMAL_PARSING_ENABLED.key -> "true") { - Seq("en-US", "ko-KR").foreach { + Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach { checkDecimalInfer(_, """struct""") - } - } - - withSQLConf(SQLConf.LEGACY_DECIMAL_PARSING_ENABLED.key -> "true") { - Seq("ru-RU").foreach { - checkDecimalInfer(_, """struct""") - } } } }