Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ license: |

- Since Spark 3.0, the Dataset and DataFrame API `unionAll` is not deprecated any more. It is an alias for `union`.

- In Spark version 2.4 and earlier, the parser of JSON data source treats empty strings as null for some data types such as `IntegerType`. For `FloatType` and `DoubleType`, it fails on empty strings and throws exceptions. Since Spark 3.0, we disallow empty strings and will throw exceptions for data types except for `StringType` and `BinaryType`.
- In Spark version 2.4 and earlier, the parser of JSON data source treats empty strings as null for some data types such as `IntegerType`. For `FloatType`, `DoubleType`, `DateType` and `TimestampType`, it fails on empty strings and throws exceptions. Since Spark 3.0, we disallow empty strings and will throw exceptions for data types except for `StringType` and `BinaryType`. The previous behaviour of allowing empty string can be restored by setting `spark.sql.legacy.json.allowEmptyString.enabled` to `true`.

- Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -307,6 +308,8 @@ class JacksonParser(
}
}

private val allowEmptyString = SQLConf.get.getConf(SQLConf.LEGACY_ALLOW_EMPTY_STRING_IN_JSON)

/**
* This function throws an exception for failed conversion. For empty string on data types
* except for string and binary types, this also throws an exception.
Expand All @@ -315,7 +318,16 @@ class JacksonParser(
parser: JsonParser,
dataType: DataType): PartialFunction[JsonToken, R] = {

// SPARK-25040: Disallow empty strings for data types except for string and binary types.
// SPARK-25040: Disallows empty strings for data types except for string and binary types.
// But treats empty strings as null for certain types if the legacy config is enabled.
case VALUE_STRING if parser.getTextLength < 1 && allowEmptyString =>
dataType match {
case FloatType | DoubleType | TimestampType | DateType =>
throw new RuntimeException(
s"Failed to parse an empty string for data type ${dataType.catalogString}")
case _ => null
}

case VALUE_STRING if parser.getTextLength < 1 =>
throw new RuntimeException(
s"Failed to parse an empty string for data type ${dataType.catalogString}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_ALLOW_EMPTY_STRING_IN_JSON =
buildConf("spark.sql.legacy.json.allowEmptyString.enabled")
.internal()
.doc("When set to true, the parser of JSON data source treats empty strings as null for " +
"some data types such as `IntegerType`.")
.booleanConf
.createWithDefault(false)

val TRUNCATE_TABLE_IGNORE_PERMISSION_ACL =
buildConf("spark.sql.truncateTable.ignorePermissionAcl.enabled")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2436,23 +2436,24 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson
}
}

test("SPARK-25040: empty strings should be disallowed") {
def failedOnEmptyString(dataType: DataType): Unit = {
val df = spark.read.schema(s"a ${dataType.catalogString}")
.option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS)
val errMessage = intercept[SparkException] {
df.collect()
}.getMessage
assert(errMessage.contains(
s"Failed to parse an empty string for data type ${dataType.catalogString}"))
}

def emptyString(dataType: DataType, expected: Any): Unit = {
val df = spark.read.schema(s"a ${dataType.catalogString}")
.option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS)
checkAnswer(df, Row(expected) :: Nil)
}
private def failedOnEmptyString(dataType: DataType): Unit = {
val df = spark.read.schema(s"a ${dataType.catalogString}")
.option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS)
val errMessage = intercept[SparkException] {
df.collect()
}.getMessage
assert(errMessage.contains(
s"Failed to parse an empty string for data type ${dataType.catalogString}"))
}

private def emptyString(dataType: DataType, expected: Any): Unit = {
val df = spark.read.schema(s"a ${dataType.catalogString}")
.option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS)
checkAnswer(df, Row(expected) :: Nil)
}

test("SPARK-25040: empty strings should be disallowed") {
failedOnEmptyString(BooleanType)
failedOnEmptyString(ByteType)
failedOnEmptyString(ShortType)
Expand All @@ -2471,6 +2472,36 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson
emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8))
}

test("SPARK-25040: allowing empty strings when legacy config is enabled") {
def emptyStringAsNull(dataType: DataType): Unit = {
val df = spark.read.schema(s"a ${dataType.catalogString}")
.option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS)
checkAnswer(df, Row(null) :: Nil)
}

// Legacy mode prior to Spark 3.0.0
withSQLConf(SQLConf.LEGACY_ALLOW_EMPTY_STRING_IN_JSON.key -> "true") {
emptyStringAsNull(BooleanType)
emptyStringAsNull(ByteType)
emptyStringAsNull(ShortType)
emptyStringAsNull(IntegerType)
emptyStringAsNull(LongType)

failedOnEmptyString(FloatType)
failedOnEmptyString(DoubleType)
failedOnEmptyString(TimestampType)
failedOnEmptyString(DateType)

emptyStringAsNull(DecimalType.SYSTEM_DEFAULT)
emptyStringAsNull(ArrayType(IntegerType))
emptyStringAsNull(MapType(StringType, IntegerType, true))
emptyStringAsNull(StructType(StructField("f1", IntegerType, true) :: Nil))

emptyString(StringType, "")
emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8))
}
}

test("return partial result for bad records") {
val schema = "a double, b array<int>, c string, _corrupt_record string"
val badRecords = Seq(
Expand Down