Skip to content
Closed
4 changes: 4 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ license: |
* Table of contents
{:toc}

## Upgrading from Spark SQL 3.2 to 3.3

- In Spark 3.3, spark will fail when parsing a JSON/CSV string with `PERMISSIVE` mode and schema contains non-nullable fields. You can set mode to `FAILFAST/DROPMALFORMED` if you want to read JSON/CSV with a schema that contains nullable fields.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change also affects parsing CSV string, I will add a unit test for CSV in a separate pr. cc @cloud-fan @HyukjinKwon


## Upgrading from Spark SQL 3.1 to 3.2

- Since Spark 3.2, ADD FILE/JAR/ARCHIVE commands require each path to be enclosed by `"` or `'` if the path contains whitespaces.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,18 @@ class JacksonParser(
schema.getFieldIndex(parser.getCurrentName) match {
case Some(index) =>
try {
row.update(index, fieldConverters(index).apply(parser))
val fieldValue = fieldConverters(index).apply(parser)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I .. am not sure if we really need these complicated fix to address nullability mismatch (which is rather a corner case) to be honest. I wonder if there's a simpler approach, e.g.) simply warning on non-nullable columns?
Just to be clear, I don't mind if other committers prefer to fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's another proposal I mentioned earlier: if the user-given schema is not-nullable, we just turn it into nullable schema and don't fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your suggestions, I'll raise a new PR.

val isIllegal =
options.parseMode != PermissiveMode && !schema(index).nullable && fieldValue == null
if (isIllegal) {
throw new IllegalSchemaArgumentException(
s"field ${schema(index).name} is not nullable but the parsed value is null.")
}
row.update(index, fieldValue)
skipRow = structFilters.skipRow(row, index)
} catch {
case e: SparkUpgradeException => throw e
case e: IllegalSchemaArgumentException => throw e
case NonFatal(e) if isRoot =>
badRecordException = badRecordException.orElse(Some(e))
parser.skipChildren()
Expand All @@ -418,6 +426,9 @@ class JacksonParser(
}
}

// When the input schema is setting to `nullable = false`, make sure the field is not null.
checkNotNullableInRow(row, schema, skipRow, badRecordException)

if (skipRow) {
None
} else if (badRecordException.isEmpty) {
Expand All @@ -427,6 +438,28 @@ class JacksonParser(
}
}

// As PERMISSIVE mode only works with nullable fields, we can skip this not nullable check when
// the mode is PERMISSIVE. (see FailureSafeParser.checkNullabilityForPermissiveMode)
private lazy val checkNotNullableInRow = if (options.parseMode != PermissiveMode) {
(row: GenericInternalRow,
schema: StructType,
skipRow: Boolean,
runtimeExceptionOption: Option[Throwable]) => {
if (runtimeExceptionOption.isEmpty && !skipRow) {
var index = 0
while (index < schema.length) {
if (!schema(index).nullable && row.isNullAt(index)) {
throw new IllegalSchemaArgumentException(
s"field ${schema(index).name} is not nullable but it's missing in one record.")
}
index += 1
}
}
}
} else {
(_: GenericInternalRow, _: StructType, _: Boolean, _: Option[Throwable]) => {}
}

/**
* Parse an object as a Map, preserving all fields.
*/
Expand Down Expand Up @@ -483,6 +516,7 @@ class JacksonParser(
}
} catch {
case e: SparkUpgradeException => throw e
case e: IllegalSchemaArgumentException => throw e
case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) =>
// JSON parser currently doesn't support partial results for corrupted records.
// For such records, all fields other than the field configured by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,8 @@ case class BadRecordException(
record: () => UTF8String,
partialResult: () => Option[InternalRow],
cause: Throwable) extends Exception(cause)

/**
* Exception thrown when the actual value is null but the schema is setting to non-nullable.
*/
case class IllegalSchemaArgumentException(message: String) extends Exception(message)
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String

class FailureSafeParser[IN](
Expand All @@ -29,11 +29,30 @@ class FailureSafeParser[IN](
schema: StructType,
columnNameOfCorruptRecord: String) {

checkNullabilityForPermissiveMode()
private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord)
private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord))
private val resultRow = new GenericInternalRow(schema.length)
private val nullResult = new GenericInternalRow(schema.length)

// As PERMISSIVE mode should not fail at runtime, so fail if the mode is PERMISSIVE and schema
// contains non-nullable fields.
private def checkNullabilityForPermissiveMode(): Unit = {
def checkNotNullableRecursively(schema: StructType): Unit = {
schema.fields.foreach {
case _ @ StructField(name, _, nullable, _) if (!nullable) =>
throw new IllegalSchemaArgumentException(s"Field ${name} is not nullable but " +
"PERMISSIVE mode only works with nullable fields.")
case _ @ StructField(_, dt: StructType, _, _) => checkNotNullableRecursively(dt)
case _ =>
}
}
mode match {
case PermissiveMode => checkNotNullableRecursively(schema)
case _ =>
}
}

// This function takes 2 parameters: an optional partial result, and the bad record. If the given
// schema doesn't contain a field for corrupted record, we just return the partial result or a
// row with all fields null. If the given schema contains a field for corrupted record, we will
Expand Down Expand Up @@ -67,6 +86,8 @@ class FailureSafeParser[IN](
case FailFastMode =>
throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(e)
}
case _: IllegalSchemaArgumentException if mode == DropMalformedMode =>
Iterator.empty
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,27 @@ package org.apache.spark.sql.catalyst.json

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.IllegalSchemaArgumentException
import org.apache.spark.sql.sources.{EqualTo, Filter, StringStartsWith}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

class JacksonParserSuite extends SparkFunSuite {
test("skipping rows using pushdown filters") {
def check(

private def check(
input: String = """{"i":1, "s": "a"}""",
schema: StructType = StructType.fromDDL("i INTEGER"),
filters: Seq[Filter],
config: Map[String, String] = Map.empty,
expected: Seq[InternalRow]): Unit = {
val options = new JSONOptions(Map.empty[String, String], "GMT", "")
val parser = new JacksonParser(schema, options, false, filters)
val createParser = CreateJacksonParser.string _
val actual = parser.parse(input, createParser, UTF8String.fromString)
assert(actual === expected)
}
val options = new JSONOptions(config, "GMT", "")
val parser = new JacksonParser(schema, options, false, filters)
val createParser = CreateJacksonParser.string _
val actual = parser.parse(input, createParser, UTF8String.fromString)
assert(actual === expected)
}

test("skipping rows using pushdown filters") {
check(filters = Seq(), expected = Seq(InternalRow(1)))
check(filters = Seq(EqualTo("i", 1)), expected = Seq(InternalRow(1)))
check(filters = Seq(EqualTo("i", 2)), expected = Seq.empty)
Expand All @@ -54,4 +57,42 @@ class JacksonParserSuite extends SparkFunSuite {
filters = Seq(EqualTo("d", 3.14)),
expected = Seq(InternalRow(1, 3.14)))
}

test("SPARK-35912: nullability with different schema nullable setting") {
val missingFieldInput = """{"c1":1}"""
val nullValueInput = """{"c1": 1, "c2": null}"""

def assertAction(nullable: Boolean, input: String)(action: => Unit): Unit = {
if (nullable) {
action
} else {
val msg = intercept[IllegalSchemaArgumentException] {
action
}.message
val expected = if (input == missingFieldInput) {
"field c2 is not nullable but it's missing in one record."
} else {
"field c2 is not nullable but the parsed value is null."
}
assert(msg.contains(expected))
}
}

Seq("FAILFAST", "DROPMALFORMED").foreach { mode =>
val config = Map("mode" -> mode)
Seq(true, false).foreach { nullable =>
val schema = StructType(Seq(
StructField("c1", IntegerType),
StructField("c2", IntegerType, nullable = nullable)
))
val expected = Seq(InternalRow(1, null))
Seq(missingFieldInput, nullValueInput).foreach { input =>
assertAction(nullable, input) {
check(input = input, schema = schema, filters = Seq.empty, config = config,
expected = expected)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ class DeprecatedAPISuite extends QueryTest with SharedSparkSession {
jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD())
checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)

schema = StructType(StructField("name", StringType, false) ::
StructField("age", IntegerType, false) :: Nil)
schema = StructType(StructField("name", StringType, true) ::
StructField("age", IntegerType, true) :: Nil)
jsonDF = sqlContext.jsonRDD(jsonRDD, schema)
checkAnswer(jsonDF, Row("Jack", 20) :: Row("Marry", 18) :: Nil)
jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD(), schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ class UserDefinedTypeSuite extends QueryTest with SharedSparkSession with Parque
"{\"id\":2,\"vec\":[2.25,4.5,8.75]}"
)
val schema = StructType(Seq(
StructField("id", IntegerType, false),
StructField("vec", new TestUDT.MyDenseVectorUDT, false)
StructField("id", IntegerType, true),
StructField("vec", new TestUDT.MyDenseVectorUDT, true)
))

val jsonRDD = spark.read.schema(schema).json(data.toDS())
Expand All @@ -167,8 +167,8 @@ class UserDefinedTypeSuite extends QueryTest with SharedSparkSession with Parque
)

val schema = StructType(Seq(
StructField("id", IntegerType, false),
StructField("vec", new TestUDT.MyDenseVectorUDT, false)
StructField("id", IntegerType, true),
StructField("vec", new TestUDT.MyDenseVectorUDT, true)
))

val jsonDataset = spark.read.schema(schema).json(data.toDS())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ abstract class JsonSuite
test("Applying schemas with MapType") {
withTempView("jsonWithSimpleMap", "jsonWithComplexMap") {
val schemaWithSimpleMap = StructType(
StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
StructField("map", MapType(StringType, IntegerType, true), true) :: Nil)
val jsonWithSimpleMap = spark.read.schema(schemaWithSimpleMap).json(mapType1)

jsonWithSimpleMap.createOrReplaceTempView("jsonWithSimpleMap")
Expand Down Expand Up @@ -953,7 +953,7 @@ abstract class JsonSuite
StructField("field1", ArrayType(IntegerType, true), true) ::
StructField("field2", IntegerType, true) :: Nil)
val schemaWithComplexMap = StructType(
StructField("map", MapType(StringType, innerStruct, true), false) :: Nil)
StructField("map", MapType(StringType, innerStruct, true), true) :: Nil)

val jsonWithComplexMap = spark.read.schema(schemaWithComplexMap).json(mapType2)

Expand Down Expand Up @@ -1392,7 +1392,7 @@ abstract class JsonSuite
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
withTempDir { dir =>
val schemaWithSimpleMap = StructType(
StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
StructField("map", MapType(StringType, IntegerType, true), true) :: Nil)
val df = spark.read.schema(schemaWithSimpleMap).json(mapType1)

val path = dir.getAbsolutePath
Expand Down Expand Up @@ -2919,6 +2919,55 @@ abstract class JsonSuite
}
}
}

test("SPARK-35912: nullability with different parse mode -- struct") {
// JSON field is missing.
val missingFieldInput = """{"c1": 1}"""
// JSON filed is null.
val nullValueInput = """{"c1": 1, "c2": null}"""

val load = (mode: String, schema: StructType, inputJson: String) => {
val json = spark.createDataset(
spark.sparkContext.parallelize(inputJson :: Nil))(Encoders.STRING)
spark.read
.option("mode", mode)
.schema(schema)
.json(json)
}

Seq(true, false).foreach { nullable =>
val schema = StructType(Seq(
StructField("c1", IntegerType, nullable = true),
StructField("c2", IntegerType, nullable = nullable)))

Seq(missingFieldInput, nullValueInput).foreach { jsonString =>
if (nullable) {
checkAnswer(load("DROPMALFORMED", schema, jsonString), Row(1, null) :: Nil)
checkAnswer(load("FAILFAST", schema, jsonString), Row(1, null) :: Nil)
checkAnswer(load("PERMISSIVE", schema, jsonString), Row(1, null) :: Nil)
} else {
checkAnswer(load("DROPMALFORMED", schema, jsonString), Seq.empty)

val exceptionMsg1 = intercept[SparkException] {
load("FAILFAST", schema, jsonString).collect
}.getMessage
val expectedMsg1 = if (jsonString == missingFieldInput) {
"field c2 is not nullable but it's missing in one record."
} else {
s"field c2 is not nullable but the parsed value is null."
}
assert(exceptionMsg1.contains(expectedMsg1))

val exceptionMsg2 = intercept[SparkException] {
load("PERMISSIVE", schema, jsonString).collect
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Since the field is non-nullable, do we still return Row(1, null) for PERMISSIVE mode? if yes, this may cause the cast struct problem as we talked about before, the field is non-nullable but row.isNullAt(index) is true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think we can't apply permissive mode here. Failing looks fine to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, PERMISSIVE mode should not fail at runtime. Shall we fail at the very beginning if the mode is PERMISSIVE and schema contains non-nullable fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, add not nullable check for PERMISSIVE mode when we initial FailureSafeParser.

}
val expectedMsg2 =
"Field c2 is not nullable but PERMISSIVE mode only works with nullable fields."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a migration guide for it? This is a breaking change as PERMISSIVE is the default mode.

Also cc @HyukjinKwon for this breaking change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another choice is to not respect the user-given schema and always turn it into a nullable schema, if the mode is PERMISSIVE.

assert(exceptionMsg2.getMessage.contains(expectedMsg2))
}
}
}
}
}

class JsonV1Suite extends JsonSuite {
Expand Down