diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index a5ef1c2b1d045..0c47370283736 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -37,7 +37,7 @@ license: | - Since Spark 3.0, the Dataset and DataFrame API `unionAll` is not deprecated any more. It is an alias for `union`. - - In Spark version 2.4 and earlier, the parser of JSON data source treats empty strings as null for some data types such as `IntegerType`. For `FloatType` and `DoubleType`, it fails on empty strings and throws exceptions. Since Spark 3.0, we disallow empty strings and will throw exceptions for data types except for `StringType` and `BinaryType`. + - In Spark version 2.4 and earlier, the parser of JSON data source treats empty strings as null for some data types such as `IntegerType`. For `FloatType`, `DoubleType`, `DateType` and `TimestampType`, it fails on empty strings and throws exceptions. Since Spark 3.0, we disallow empty strings and will throw exceptions for data types except for `StringType` and `BinaryType`. The previous behaviour of allowing empty string can be restored by setting `spark.sql.legacy.json.allowEmptyString.enabled` to `true`. - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index c44025ca8bcfd..76efa574a99ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} import org.apache.spark.util.Utils @@ -307,6 +308,8 @@ class JacksonParser( } } + private val allowEmptyString = SQLConf.get.getConf(SQLConf.LEGACY_ALLOW_EMPTY_STRING_IN_JSON) + /** * This function throws an exception for failed conversion. For empty string on data types * except for string and binary types, this also throws an exception. @@ -315,7 +318,16 @@ class JacksonParser( parser: JsonParser, dataType: DataType): PartialFunction[JsonToken, R] = { - // SPARK-25040: Disallow empty strings for data types except for string and binary types. + // SPARK-25040: Disallows empty strings for data types except for string and binary types. + // But treats empty strings as null for certain types if the legacy config is enabled. + case VALUE_STRING if parser.getTextLength < 1 && allowEmptyString => + dataType match { + case FloatType | DoubleType | TimestampType | DateType => + throw new RuntimeException( + s"Failed to parse an empty string for data type ${dataType.catalogString}") + case _ => null + } + case VALUE_STRING if parser.getTextLength < 1 => throw new RuntimeException( s"Failed to parse an empty string for data type ${dataType.catalogString}") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b94ddbdc0fc9a..5ce5692123805 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1989,6 +1989,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_ALLOW_EMPTY_STRING_IN_JSON = + buildConf("spark.sql.legacy.json.allowEmptyString.enabled") + .internal() + .doc("When set to true, the parser of JSON data source treats empty strings as null for " + + "some data types such as `IntegerType`.") + .booleanConf + .createWithDefault(false) + val TRUNCATE_TABLE_IGNORE_PERMISSION_ACL = buildConf("spark.sql.truncateTable.ignorePermissionAcl.enabled") .internal() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index d0e2e001034fb..b20da2266b0f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2436,23 +2436,24 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson } } - test("SPARK-25040: empty strings should be disallowed") { - def failedOnEmptyString(dataType: DataType): Unit = { - val df = spark.read.schema(s"a ${dataType.catalogString}") - .option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS) - val errMessage = intercept[SparkException] { - df.collect() - }.getMessage - assert(errMessage.contains( - s"Failed to parse an empty string for data type ${dataType.catalogString}")) - } - def emptyString(dataType: DataType, expected: Any): Unit = { - val df = spark.read.schema(s"a ${dataType.catalogString}") - .option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS) - checkAnswer(df, Row(expected) :: Nil) - } + private def failedOnEmptyString(dataType: DataType): Unit = { + val df = spark.read.schema(s"a ${dataType.catalogString}") + .option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS) + val errMessage = intercept[SparkException] { + df.collect() + }.getMessage + assert(errMessage.contains( + s"Failed to parse an empty string for data type ${dataType.catalogString}")) + } + private def emptyString(dataType: DataType, expected: Any): Unit = { + val df = spark.read.schema(s"a ${dataType.catalogString}") + .option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS) + checkAnswer(df, Row(expected) :: Nil) + } + + test("SPARK-25040: empty strings should be disallowed") { failedOnEmptyString(BooleanType) failedOnEmptyString(ByteType) failedOnEmptyString(ShortType) @@ -2471,6 +2472,36 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8)) } + test("SPARK-25040: allowing empty strings when legacy config is enabled") { + def emptyStringAsNull(dataType: DataType): Unit = { + val df = spark.read.schema(s"a ${dataType.catalogString}") + .option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS) + checkAnswer(df, Row(null) :: Nil) + } + + // Legacy mode prior to Spark 3.0.0 + withSQLConf(SQLConf.LEGACY_ALLOW_EMPTY_STRING_IN_JSON.key -> "true") { + emptyStringAsNull(BooleanType) + emptyStringAsNull(ByteType) + emptyStringAsNull(ShortType) + emptyStringAsNull(IntegerType) + emptyStringAsNull(LongType) + + failedOnEmptyString(FloatType) + failedOnEmptyString(DoubleType) + failedOnEmptyString(TimestampType) + failedOnEmptyString(DateType) + + emptyStringAsNull(DecimalType.SYSTEM_DEFAULT) + emptyStringAsNull(ArrayType(IntegerType)) + emptyStringAsNull(MapType(StringType, IntegerType, true)) + emptyStringAsNull(StructType(StructField("f1", IntegerType, true) :: Nil)) + + emptyString(StringType, "") + emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8)) + } + } + test("return partial result for bad records") { val schema = "a double, b array, c string, _corrupt_record string" val badRecords = Seq(