From 6f29bea268661cbfc75e89f824ec223c2a41ade1 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Mon, 5 Jul 2021 15:08:55 +0800 Subject: [PATCH 01/14] fix spark.read.json nullability. --- .../sql/catalyst/json/JacksonParser.scala | 19 ++++ .../catalyst/json/JacksonParserSuite.scala | 89 ++++++++++++++++--- 2 files changed, 98 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 8a1191c5b7ee2..acf7698333d7f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -29,6 +29,7 @@ import org.apache.spark.SparkUpgradeException import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters} import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.isPrimitiveType import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.errors.QueryExecutionErrors @@ -399,6 +400,7 @@ class JacksonParser( val row = new GenericInternalRow(schema.length) var badRecordException: Option[Throwable] = None var skipRow = false + var checkedIndexSet = Set.empty[Int] structFilters.reset() while (!skipRow && nextUntil(parser, JsonToken.END_OBJECT)) { @@ -407,6 +409,7 @@ class JacksonParser( try { row.update(index, fieldConverters(index).apply(parser)) skipRow = structFilters.skipRow(row, index) + checkedIndexSet += index } catch { case e: SparkUpgradeException => throw e case NonFatal(e) if isRoot => @@ -418,6 +421,22 @@ class JacksonParser( } } + // When the input schema is setting to `nullable = false`, make sure the primitive type has a + // default value rather than setting a null value. this is because Spark uses + // `InternalRow.isNullAt(index)` to guarantees the actual value is null or not. + var index = 0 + while (!skipRow && index < schema.length) { + val sf = schema(index) + if (!sf.nullable && isPrimitiveType(sf.dataType) && row.isNullAt(index)) { + val writer = InternalRow.getWriter(index, schema(index).dataType) + writer(row, null) + } + if (!checkedIndexSet.contains(index)) { + skipRow = structFilters.skipRow(row, index) + } + index += 1 + } + if (skipRow) { None } else if (badRecordException.isEmpty) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala index 587e22e787b87..e496ab22e5a04 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala @@ -19,24 +19,24 @@ package org.apache.spark.sql.catalyst.json import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.sources.{EqualTo, Filter, StringStartsWith} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.sources.{EqualTo, Filter, IsNotNull, IsNull, StringStartsWith} +import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String class JacksonParserSuite extends SparkFunSuite { - test("skipping rows using pushdown filters") { - def check( + private def check( input: String = """{"i":1, "s": "a"}""", schema: StructType = StructType.fromDDL("i INTEGER"), filters: Seq[Filter], expected: Seq[InternalRow]): Unit = { - val options = new JSONOptions(Map.empty[String, String], "GMT", "") - val parser = new JacksonParser(schema, options, false, filters) - val createParser = CreateJacksonParser.string _ - val actual = parser.parse(input, createParser, UTF8String.fromString) - assert(actual === expected) - } + val options = new JSONOptions(Map.empty[String, String], "GMT", "") + val parser = new JacksonParser(schema, options, false, filters) + val createParser = CreateJacksonParser.string _ + val actual = parser.parse(input, createParser, UTF8String.fromString) + assert(actual === expected) + } + test("skipping rows using pushdown filters") { check(filters = Seq(), expected = Seq(InternalRow(1))) check(filters = Seq(EqualTo("i", 1)), expected = Seq(InternalRow(1))) check(filters = Seq(EqualTo("i", 2)), expected = Seq.empty) @@ -54,4 +54,73 @@ class JacksonParserSuite extends SparkFunSuite { filters = Seq(EqualTo("d", 3.14)), expected = Seq(InternalRow(1, 3.14))) } + + test("35912: nullability with different schema nullable setting") { + // primitive type default value. + val defaultValue: Map[DataType, Any] = Map( + BooleanType -> false, + ByteType -> 0.toByte, + ShortType -> 0.toShort, + IntegerType -> 0, + LongType -> 0.toLong, + FloatType -> 0.0F, + DoubleType -> 0.0D) + val metadata = new MetadataBuilder() + .putString("name", "age") + .build() + val structType = StructType(Seq( + StructField("a", IntegerType, nullable = true), + StructField("b", ArrayType(DoubleType), nullable = false), + StructField("c", DoubleType, nullable = false, metadata))) + val a1 = ArrayType(DoubleType, true) + val a2 = ArrayType(StringType, false) + val m1 = MapType(IntegerType, StringType, true) + val m2 = MapType(IntegerType, ArrayType(DoubleType), false) + val dataTypeSet: Set[DataType] = Set( + BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, NullType, + StringType, CalendarIntervalType, DecimalType.SYSTEM_DEFAULT, structType, a1, a2, m1, m2 + ) + dataTypeSet.foreach { dt => + Seq(true, false).foreach { nullable => + val schema = StructType(Seq( + StructField("i", IntegerType), + StructField("not_exist_col", dt, nullable = nullable) + )) + val expected = if (nullable) { + Seq(InternalRow(1, null)) + } else { + Seq(InternalRow(1, defaultValue.getOrElse(dt, null))) + } + check(schema = schema, filters = Seq(EqualTo("i", 1)), expected = expected) + } + } + + val schema = (nullable: Boolean) => StructType(Seq( + StructField("i", IntegerType), + StructField("not_exist_col", IntegerType, nullable = nullable) + )) + // filter by not exist column + Seq(true, false).foreach { nullable => + val s = schema(nullable) + check(schema = s, filters = Seq(EqualTo("not_exist_col", 1)), expected = Seq.empty) + + val expected1 = if (nullable) Seq.empty else Seq(InternalRow(1, 0)) + check(schema = s, filters = Seq(EqualTo("not_exist_col", 0)), expected = expected1) + + val expected2 = if (nullable) Seq.empty else Seq(InternalRow(1, 0)) + check(schema = s, filters = Seq(IsNotNull("not_exist_col")), expected = expected2) + + val expected3 = if (nullable) Seq(InternalRow(1, null)) else Seq.empty + check(schema = s, filters = Seq(IsNull("not_exist_col")), expected = expected3) + + val input = """{"a": 1, "b": null}""" + val s2 = StructType(Seq( + StructField("a", IntegerType), + StructField("b", IntegerType, nullable = nullable))) + val expected4 = if (nullable) Seq.empty else Seq(InternalRow(1, 0)) + check(input = input, schema = s2, filters = Seq(IsNotNull("b")), expected = expected4) + val expected5 = if (nullable) Seq(InternalRow(1, null)) else Seq.empty + check(input = input, schema = s2, filters = Seq(IsNull("b")), expected = expected5) + } + } } From 1b8dade475852d62cd9defc8fa6ff5cc7d26d181 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Tue, 6 Jul 2021 18:06:47 +0800 Subject: [PATCH 02/14] fail instead of set a default valut --- .../sql/catalyst/json/JacksonParser.scala | 16 ++-- .../sql/catalyst/util/FailureSafeParser.scala | 2 + .../catalyst/json/JacksonParserSuite.scala | 81 ++++++++----------- .../datasources/json/JsonSuite.scala | 44 ++++++++++ 4 files changed, 88 insertions(+), 55 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index acf7698333d7f..e1bf3b9517994 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -29,7 +29,6 @@ import org.apache.spark.SparkUpgradeException import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.isPrimitiveType import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.errors.QueryExecutionErrors @@ -412,6 +411,7 @@ class JacksonParser( checkedIndexSet += index } catch { case e: SparkUpgradeException => throw e + case e: IllegalArgumentException => throw e case NonFatal(e) if isRoot => badRecordException = badRecordException.orElse(Some(e)) parser.skipChildren() @@ -421,15 +421,12 @@ class JacksonParser( } } - // When the input schema is setting to `nullable = false`, make sure the primitive type has a - // default value rather than setting a null value. this is because Spark uses - // `InternalRow.isNullAt(index)` to guarantees the actual value is null or not. + // When the input schema is setting to `nullable = false`, make sure the field is not null. var index = 0 - while (!skipRow && index < schema.length) { - val sf = schema(index) - if (!sf.nullable && isPrimitiveType(sf.dataType) && row.isNullAt(index)) { - val writer = InternalRow.getWriter(index, schema(index).dataType) - writer(row, null) + while (badRecordException.isEmpty && !skipRow && index < schema.length) { + if (!schema(index).nullable && row.isNullAt(index)) { + throw new IllegalArgumentException( + s"the null value found when parsing non-nullable field ${schema(index).name}.") } if (!checkedIndexSet.contains(index)) { skipRow = structFilters.skipRow(row, index) @@ -502,6 +499,7 @@ class JacksonParser( } } catch { case e: SparkUpgradeException => throw e + case e: IllegalArgumentException => throw e case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) => // JSON parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index ab7c9310bf844..ac29e86fc2138 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -67,6 +67,8 @@ class FailureSafeParser[IN]( case FailFastMode => throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(e) } + case _ if (mode == DropMalformedMode) => + Iterator.empty } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala index e496ab22e5a04..86bc1230a28c2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala @@ -56,41 +56,22 @@ class JacksonParserSuite extends SparkFunSuite { } test("35912: nullability with different schema nullable setting") { - // primitive type default value. - val defaultValue: Map[DataType, Any] = Map( - BooleanType -> false, - ByteType -> 0.toByte, - ShortType -> 0.toShort, - IntegerType -> 0, - LongType -> 0.toLong, - FloatType -> 0.0F, - DoubleType -> 0.0D) - val metadata = new MetadataBuilder() - .putString("name", "age") - .build() - val structType = StructType(Seq( - StructField("a", IntegerType, nullable = true), - StructField("b", ArrayType(DoubleType), nullable = false), - StructField("c", DoubleType, nullable = false, metadata))) - val a1 = ArrayType(DoubleType, true) - val a2 = ArrayType(StringType, false) - val m1 = MapType(IntegerType, StringType, true) - val m2 = MapType(IntegerType, ArrayType(DoubleType), false) - val dataTypeSet: Set[DataType] = Set( - BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, NullType, - StringType, CalendarIntervalType, DecimalType.SYSTEM_DEFAULT, structType, a1, a2, m1, m2 - ) - dataTypeSet.foreach { dt => - Seq(true, false).foreach { nullable => - val schema = StructType(Seq( - StructField("i", IntegerType), - StructField("not_exist_col", dt, nullable = nullable) - )) - val expected = if (nullable) { - Seq(InternalRow(1, null)) - } else { - Seq(InternalRow(1, defaultValue.getOrElse(dt, null))) + def assertAction(nullable: Boolean)(action: => Unit): Unit = { + if (nullable) { + action + } else { + assertThrows[IllegalArgumentException] { + action } + } + } + Seq(true, false).foreach { nullable => + val schema = StructType(Seq( + StructField("i", IntegerType), + StructField("not_exist_col", IntegerType, nullable = nullable) + )) + val expected = Seq(InternalRow(1, null)) + assertAction(nullable) { check(schema = schema, filters = Seq(EqualTo("i", 1)), expected = expected) } } @@ -102,23 +83,31 @@ class JacksonParserSuite extends SparkFunSuite { // filter by not exist column Seq(true, false).foreach { nullable => val s = schema(nullable) - check(schema = s, filters = Seq(EqualTo("not_exist_col", 1)), expected = Seq.empty) - - val expected1 = if (nullable) Seq.empty else Seq(InternalRow(1, 0)) - check(schema = s, filters = Seq(EqualTo("not_exist_col", 0)), expected = expected1) - - val expected2 = if (nullable) Seq.empty else Seq(InternalRow(1, 0)) - check(schema = s, filters = Seq(IsNotNull("not_exist_col")), expected = expected2) + assertAction(nullable) { + check(schema = s, filters = Seq(EqualTo("not_exist_col", 1)), expected = Seq.empty) + } + assertAction(nullable) { + check(schema = s, filters = Seq(EqualTo("not_exist_col", 0)), expected = Seq.empty) + } + assertAction(nullable) { + check(schema = s, filters = Seq(IsNotNull("not_exist_col")), expected = Seq.empty) + } + assertAction(nullable) { + check(schema = s, filters = Seq(IsNull("not_exist_col")), + expected = Seq(InternalRow(1, null))) + } - val expected3 = if (nullable) Seq(InternalRow(1, null)) else Seq.empty - check(schema = s, filters = Seq(IsNull("not_exist_col")), expected = expected3) + } - val input = """{"a": 1, "b": null}""" + val input = """{"a": 1, "b": null}""" + // filter by null value column + Seq(true, false).foreach { nullable => val s2 = StructType(Seq( StructField("a", IntegerType), StructField("b", IntegerType, nullable = nullable))) - val expected4 = if (nullable) Seq.empty else Seq(InternalRow(1, 0)) - check(input = input, schema = s2, filters = Seq(IsNotNull("b")), expected = expected4) + assertAction(nullable) { + check(input = input, schema = s2, filters = Seq(IsNotNull("b")), expected = Seq.empty) + } val expected5 = if (nullable) Seq(InternalRow(1, null)) else Seq.empty check(input = input, schema = s2, filters = Seq(IsNull("b")), expected = expected5) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index dab1255eeab32..f60e4824dad4d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2919,6 +2919,50 @@ abstract class JsonSuite } } } + + test("SPARK-35912: nullability with different parse mode -- struct") { + val input = + """ + |{ + | "c1": { + | "c2": 1 + | } + |} + |""".stripMargin + val json = spark.createDataset(spark.sparkContext.parallelize(input :: Nil))(Encoders.STRING) + + val load = (mode: String, schema: StructType) => { + spark.read + .option("mode", mode) + .schema(schema) + .json(json) + } + + Seq(true, false).foreach { nullable => + val schema = StructType(Seq( + StructField("c1", + StructType(Seq( + StructField("c2", IntegerType, nullable = false), + StructField("not_exist_col", IntegerType, nullable = false) + )), + nullable = nullable)) + ) + + checkAnswer(load("DROPMALFORMED", schema), Seq.empty) + + val exception = intercept[SparkException] { + load("FAILFAST", schema).collect + }.getMessage + assert(exception.contains( + "the null value found when parsing non-nullable field not_exist_col.")) + + val e = intercept[SparkException] { + load("PERMISSIVE", schema).collect() + } + assert(e.getMessage.contains( + "the null value found when parsing non-nullable field not_exist_col.")) + } + } } class JsonV1Suite extends JsonSuite { From b1d749fa9bcce59afb2c54a21f422e7bff3fa261 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Wed, 7 Jul 2021 16:24:42 +0800 Subject: [PATCH 03/14] fix test --- .../org/apache/spark/sql/catalyst/json/JacksonParser.scala | 6 +++--- .../apache/spark/sql/catalyst/util/BadRecordException.scala | 5 +++++ .../apache/spark/sql/catalyst/json/JacksonParserSuite.scala | 3 ++- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index e1bf3b9517994..85fe0e2471b10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -411,7 +411,7 @@ class JacksonParser( checkedIndexSet += index } catch { case e: SparkUpgradeException => throw e - case e: IllegalArgumentException => throw e + case e: IllegalSchemaArgumentException => throw e case NonFatal(e) if isRoot => badRecordException = badRecordException.orElse(Some(e)) parser.skipChildren() @@ -425,7 +425,7 @@ class JacksonParser( var index = 0 while (badRecordException.isEmpty && !skipRow && index < schema.length) { if (!schema(index).nullable && row.isNullAt(index)) { - throw new IllegalArgumentException( + throw new IllegalSchemaArgumentException( s"the null value found when parsing non-nullable field ${schema(index).name}.") } if (!checkedIndexSet.contains(index)) { @@ -499,7 +499,7 @@ class JacksonParser( } } catch { case e: SparkUpgradeException => throw e - case e: IllegalArgumentException => throw e + case e: IllegalSchemaArgumentException => throw e case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) => // JSON parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala index d719a33929fcc..0a07a6c9c4862 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala @@ -41,3 +41,8 @@ case class BadRecordException( record: () => UTF8String, partialResult: () => Option[InternalRow], cause: Throwable) extends Exception(cause) + +/** + * Exception thrown when the actual value is null but the schema is setting to non-nullable. + */ +case class IllegalSchemaArgumentException(message: String) extends Exception(message) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala index 86bc1230a28c2..a2daac9c214bc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.json import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.IllegalSchemaArgumentException import org.apache.spark.sql.sources.{EqualTo, Filter, IsNotNull, IsNull, StringStartsWith} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -60,7 +61,7 @@ class JacksonParserSuite extends SparkFunSuite { if (nullable) { action } else { - assertThrows[IllegalArgumentException] { + assertThrows[IllegalSchemaArgumentException] { action } } From 48b8006861dafa44a62d7b91b00259d16398f817 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Fri, 9 Jul 2021 14:25:25 +0800 Subject: [PATCH 04/14] update test --- .../sql/catalyst/json/JacksonParser.scala | 7 +- .../catalyst/json/JacksonParserSuite.scala | 72 +++++++++---------- .../datasources/json/JsonSuite.scala | 57 ++++++++------- 3 files changed, 70 insertions(+), 66 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 85fe0e2471b10..e213db12d8a2e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -406,7 +406,12 @@ class JacksonParser( schema.getFieldIndex(parser.getCurrentName) match { case Some(index) => try { - row.update(index, fieldConverters(index).apply(parser)) + val fieldValue = fieldConverters(index).apply(parser) + if (!schema(index).nullable && fieldValue == null) { + throw new IllegalSchemaArgumentException( + s"the null value found when parsing non-nullable field ${schema(index).name}.") + } + row.update(index, fieldValue) skipRow = structFilters.skipRow(row, index) checkedIndexSet += index } catch { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala index a2daac9c214bc..8b296511cd10a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala @@ -61,56 +61,52 @@ class JacksonParserSuite extends SparkFunSuite { if (nullable) { action } else { - assertThrows[IllegalSchemaArgumentException] { + val msg = intercept[IllegalSchemaArgumentException] { action - } + }.message + assert(msg.contains("the null value found when parsing non-nullable field c2.")) } } + + val missingFieldInput = """{"c1":1}""" + val nullValueInput = """{"c1": 1, "c2": null}""" Seq(true, false).foreach { nullable => val schema = StructType(Seq( - StructField("i", IntegerType), - StructField("not_exist_col", IntegerType, nullable = nullable) + StructField("c1", IntegerType), + StructField("c2", IntegerType, nullable = nullable) )) val expected = Seq(InternalRow(1, null)) - assertAction(nullable) { - check(schema = schema, filters = Seq(EqualTo("i", 1)), expected = expected) - } - } - - val schema = (nullable: Boolean) => StructType(Seq( - StructField("i", IntegerType), - StructField("not_exist_col", IntegerType, nullable = nullable) - )) - // filter by not exist column - Seq(true, false).foreach { nullable => - val s = schema(nullable) - assertAction(nullable) { - check(schema = s, filters = Seq(EqualTo("not_exist_col", 1)), expected = Seq.empty) - } - assertAction(nullable) { - check(schema = s, filters = Seq(EqualTo("not_exist_col", 0)), expected = Seq.empty) - } - assertAction(nullable) { - check(schema = s, filters = Seq(IsNotNull("not_exist_col")), expected = Seq.empty) - } - assertAction(nullable) { - check(schema = s, filters = Seq(IsNull("not_exist_col")), - expected = Seq(InternalRow(1, null))) + Seq(missingFieldInput, nullValueInput).foreach { input => + assertAction(nullable) { + check(input = input, schema = schema, filters = Seq.empty, expected = expected) + } } - } - val input = """{"a": 1, "b": null}""" - // filter by null value column + // filter by not exist field and filter by null value field. Seq(true, false).foreach { nullable => - val s2 = StructType(Seq( - StructField("a", IntegerType), - StructField("b", IntegerType, nullable = nullable))) - assertAction(nullable) { - check(input = input, schema = s2, filters = Seq(IsNotNull("b")), expected = Seq.empty) + val schema = StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", IntegerType, nullable = nullable) + )) + Seq(missingFieldInput, nullValueInput).foreach { input => + assertAction(nullable) { + check(input = input, schema = schema, filters = Seq(EqualTo("c2", 1)), + expected = Seq.empty) + } + assertAction(nullable) { + check(input = input, schema = schema, filters = Seq(EqualTo("c2", 0)), + expected = Seq.empty) + } + assertAction(nullable) { + check(input = input, schema = schema, filters = Seq(IsNotNull("c2")), + expected = Seq.empty) + } + assertAction(nullable) { + check(input = input, schema = schema, filters = Seq(IsNull("c2")), + expected = Seq(InternalRow(1, null))) + } } - val expected5 = if (nullable) Seq(InternalRow(1, null)) else Seq.empty - check(input = input, schema = s2, filters = Seq(IsNull("b")), expected = expected5) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index f60e4824dad4d..0c3401656c966 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2921,46 +2921,49 @@ abstract class JsonSuite } test("SPARK-35912: nullability with different parse mode -- struct") { - val input = + // JSON field is missing. + val input = """{"c1": 1}""" + // JSON filed is null. + val input2 = """ |{ - | "c1": { - | "c2": 1 - | } + | "c1": 1, + | "c2": null |} |""".stripMargin - val json = spark.createDataset(spark.sparkContext.parallelize(input :: Nil))(Encoders.STRING) - val load = (mode: String, schema: StructType) => { + val load = (mode: String, schema: StructType, inputJson: String) => { + val json = spark.createDataset( + spark.sparkContext.parallelize(inputJson :: Nil))(Encoders.STRING) spark.read .option("mode", mode) .schema(schema) .json(json) } - Seq(true, false).foreach { nullable => val schema = StructType(Seq( - StructField("c1", - StructType(Seq( - StructField("c2", IntegerType, nullable = false), - StructField("not_exist_col", IntegerType, nullable = false) - )), - nullable = nullable)) - ) - - checkAnswer(load("DROPMALFORMED", schema), Seq.empty) - - val exception = intercept[SparkException] { - load("FAILFAST", schema).collect - }.getMessage - assert(exception.contains( - "the null value found when parsing non-nullable field not_exist_col.")) - - val e = intercept[SparkException] { - load("PERMISSIVE", schema).collect() + StructField("c1", IntegerType, nullable = false), + StructField("c2", IntegerType, nullable = nullable))) + + Seq(input, input2).foreach { jsonString => + if (nullable) { + checkAnswer(load("DROPMALFORMED", schema, jsonString), Row(1, null) :: Nil) + checkAnswer(load("FAILFAST", schema, jsonString), Row(1, null) :: Nil) + checkAnswer(load("PERMISSIVE", schema, jsonString), Row(1, null) :: Nil) + } else { + checkAnswer(load("DROPMALFORMED", schema, jsonString), Seq.empty) + val exceptionMsg1 = intercept[SparkException] { + load("FAILFAST", schema, jsonString).collect + }.getMessage + assert(exceptionMsg1.contains( + "the null value found when parsing non-nullable field c2.")) + val exceptionMsg2 = intercept[SparkException] { + load("PERMISSIVE", schema, jsonString).collect + } + assert(exceptionMsg2.getMessage.contains( + "the null value found when parsing non-nullable field c2.")) + } } - assert(e.getMessage.contains( - "the null value found when parsing non-nullable field not_exist_col.")) } } } From e44348dc7a2c68e8366184063de093ccbc7969b5 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Mon, 12 Jul 2021 10:34:59 +0800 Subject: [PATCH 05/14] fail if the mode is PERMISSIVE and schema contains non-nullable fields. --- .../sql/catalyst/json/JacksonParser.scala | 4 +-- .../sql/catalyst/util/FailureSafeParser.scala | 21 ++++++++++++++- .../catalyst/json/JacksonParserSuite.scala | 24 ++++++++++------- .../datasources/json/JsonSuite.scala | 26 ++++++++++++------- 4 files changed, 54 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index e213db12d8a2e..2dbaa9ab28d9b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -409,7 +409,7 @@ class JacksonParser( val fieldValue = fieldConverters(index).apply(parser) if (!schema(index).nullable && fieldValue == null) { throw new IllegalSchemaArgumentException( - s"the null value found when parsing non-nullable field ${schema(index).name}.") + s"field ${schema(index).name} is not nullable but the parsed value is null.") } row.update(index, fieldValue) skipRow = structFilters.skipRow(row, index) @@ -431,7 +431,7 @@ class JacksonParser( while (badRecordException.isEmpty && !skipRow && index < schema.length) { if (!schema(index).nullable && row.isNullAt(index)) { throw new IllegalSchemaArgumentException( - s"the null value found when parsing non-nullable field ${schema(index).name}.") + s"field ${schema(index).name} is not nullable but it's missing in one record.") } if (!checkedIndexSet.contains(index)) { skipRow = structFilters.skipRow(row, index) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index ac29e86fc2138..1be0938e8f1b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( @@ -29,11 +29,30 @@ class FailureSafeParser[IN]( schema: StructType, columnNameOfCorruptRecord: String) { + disableNotNullableForPermissiveMode private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) private val resultRow = new GenericInternalRow(schema.length) private val nullResult = new GenericInternalRow(schema.length) + // As PERMISSIVE mode should not fail at runtime, so fail if the mode is PERMISSIVE and schema + // contains non-nullable fields. + private def disableNotNullableForPermissiveMode: Unit = { + def checkNotNullableRecursively(schema: StructType): Unit = { + schema.fields.foreach { + case _ @ StructField(name, _, nullable, _) if (!nullable) => + throw new IllegalSchemaArgumentException(s"field ${name} is not nullable but the " + + "not nullable field is not allowed in PERMISSIVE mode.") + case _ @ StructField(_, dt: StructType, _, _) => checkNotNullableRecursively(dt) + case _ => + } + } + mode match { + case PermissiveMode => checkNotNullableRecursively(schema) + case _ => + } + } + // This function takes 2 parameters: an optional partial result, and the bad record. If the given // schema doesn't contain a field for corrupted record, we just return the partial result or a // row with all fields null. If the given schema contains a field for corrupted record, we will diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala index 8b296511cd10a..0cec57fd3ceaf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala @@ -57,19 +57,25 @@ class JacksonParserSuite extends SparkFunSuite { } test("35912: nullability with different schema nullable setting") { - def assertAction(nullable: Boolean)(action: => Unit): Unit = { + val missingFieldInput = """{"c1":1}""" + val nullValueInput = """{"c1": 1, "c2": null}""" + + def assertAction(nullable: Boolean, input: String)(action: => Unit): Unit = { if (nullable) { action } else { val msg = intercept[IllegalSchemaArgumentException] { action }.message - assert(msg.contains("the null value found when parsing non-nullable field c2.")) + val expected = if (input == missingFieldInput) { + "field c2 is not nullable but it's missing in one record." + } else { + s"field c2 is not nullable but the parsed value is null." + } + assert(msg.contains(expected)) } } - val missingFieldInput = """{"c1":1}""" - val nullValueInput = """{"c1": 1, "c2": null}""" Seq(true, false).foreach { nullable => val schema = StructType(Seq( StructField("c1", IntegerType), @@ -77,7 +83,7 @@ class JacksonParserSuite extends SparkFunSuite { )) val expected = Seq(InternalRow(1, null)) Seq(missingFieldInput, nullValueInput).foreach { input => - assertAction(nullable) { + assertAction(nullable, input) { check(input = input, schema = schema, filters = Seq.empty, expected = expected) } } @@ -90,19 +96,19 @@ class JacksonParserSuite extends SparkFunSuite { StructField("c2", IntegerType, nullable = nullable) )) Seq(missingFieldInput, nullValueInput).foreach { input => - assertAction(nullable) { + assertAction(nullable, input) { check(input = input, schema = schema, filters = Seq(EqualTo("c2", 1)), expected = Seq.empty) } - assertAction(nullable) { + assertAction(nullable, input) { check(input = input, schema = schema, filters = Seq(EqualTo("c2", 0)), expected = Seq.empty) } - assertAction(nullable) { + assertAction(nullable, input) { check(input = input, schema = schema, filters = Seq(IsNotNull("c2")), expected = Seq.empty) } - assertAction(nullable) { + assertAction(nullable, input) { check(input = input, schema = schema, filters = Seq(IsNull("c2")), expected = Seq(InternalRow(1, null))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 0c3401656c966..320d3b1846561 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2922,9 +2922,9 @@ abstract class JsonSuite test("SPARK-35912: nullability with different parse mode -- struct") { // JSON field is missing. - val input = """{"c1": 1}""" + val missingFieldInput = """{"c1": 1}""" // JSON filed is null. - val input2 = + val nullValueInput = """ |{ | "c1": 1, @@ -2940,28 +2940,36 @@ abstract class JsonSuite .schema(schema) .json(json) } + Seq(true, false).foreach { nullable => val schema = StructType(Seq( - StructField("c1", IntegerType, nullable = false), - StructField("c2", IntegerType, nullable = nullable))) + StructField("c1", IntegerType, nullable = true), + StructField("c2", IntegerType, nullable = nullable))) - Seq(input, input2).foreach { jsonString => + Seq(missingFieldInput, nullValueInput).foreach { jsonString => if (nullable) { checkAnswer(load("DROPMALFORMED", schema, jsonString), Row(1, null) :: Nil) checkAnswer(load("FAILFAST", schema, jsonString), Row(1, null) :: Nil) checkAnswer(load("PERMISSIVE", schema, jsonString), Row(1, null) :: Nil) } else { checkAnswer(load("DROPMALFORMED", schema, jsonString), Seq.empty) + val exceptionMsg1 = intercept[SparkException] { load("FAILFAST", schema, jsonString).collect }.getMessage - assert(exceptionMsg1.contains( - "the null value found when parsing non-nullable field c2.")) + val expectedMsg1 = if (jsonString == missingFieldInput) { + "field c2 is not nullable but it's missing in one record." + } else { + s"field c2 is not nullable but the parsed value is null." + } + assert(exceptionMsg1.contains(expectedMsg1)) + val exceptionMsg2 = intercept[SparkException] { load("PERMISSIVE", schema, jsonString).collect } - assert(exceptionMsg2.getMessage.contains( - "the null value found when parsing non-nullable field c2.")) + val expectedMsg2 = + "field c2 is not nullable but the not nullable field is not allowed in PERMISSIVE mode." + assert(exceptionMsg2.getMessage.contains(expectedMsg2)) } } } From 3cfc5833f901323c7a896cd4370658d7afe6593e Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Mon, 12 Jul 2021 13:31:42 +0800 Subject: [PATCH 06/14] fix test --- .../scala/org/apache/spark/sql/UserDefinedTypeSuite.scala | 8 ++++---- .../spark/sql/execution/datasources/json/JsonSuite.scala | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala index cc52b6d8a14a7..4b779ef6d52f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala @@ -147,8 +147,8 @@ class UserDefinedTypeSuite extends QueryTest with SharedSparkSession with Parque "{\"id\":2,\"vec\":[2.25,4.5,8.75]}" ) val schema = StructType(Seq( - StructField("id", IntegerType, false), - StructField("vec", new TestUDT.MyDenseVectorUDT, false) + StructField("id", IntegerType, true), + StructField("vec", new TestUDT.MyDenseVectorUDT, true) )) val jsonRDD = spark.read.schema(schema).json(data.toDS()) @@ -167,8 +167,8 @@ class UserDefinedTypeSuite extends QueryTest with SharedSparkSession with Parque ) val schema = StructType(Seq( - StructField("id", IntegerType, false), - StructField("vec", new TestUDT.MyDenseVectorUDT, false) + StructField("id", IntegerType, true), + StructField("vec", new TestUDT.MyDenseVectorUDT, true) )) val jsonDataset = spark.read.schema(schema).json(data.toDS()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 320d3b1846561..9bc845dd7036c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -924,7 +924,7 @@ abstract class JsonSuite test("Applying schemas with MapType") { withTempView("jsonWithSimpleMap", "jsonWithComplexMap") { val schemaWithSimpleMap = StructType( - StructField("map", MapType(StringType, IntegerType, true), false) :: Nil) + StructField("map", MapType(StringType, IntegerType, true), true) :: Nil) val jsonWithSimpleMap = spark.read.schema(schemaWithSimpleMap).json(mapType1) jsonWithSimpleMap.createOrReplaceTempView("jsonWithSimpleMap") @@ -953,7 +953,7 @@ abstract class JsonSuite StructField("field1", ArrayType(IntegerType, true), true) :: StructField("field2", IntegerType, true) :: Nil) val schemaWithComplexMap = StructType( - StructField("map", MapType(StringType, innerStruct, true), false) :: Nil) + StructField("map", MapType(StringType, innerStruct, true), true) :: Nil) val jsonWithComplexMap = spark.read.schema(schemaWithComplexMap).json(mapType2) @@ -1392,7 +1392,7 @@ abstract class JsonSuite withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { withTempDir { dir => val schemaWithSimpleMap = StructType( - StructField("map", MapType(StringType, IntegerType, true), false) :: Nil) + StructField("map", MapType(StringType, IntegerType, true), true) :: Nil) val df = spark.read.schema(schemaWithSimpleMap).json(mapType1) val path = dir.getAbsolutePath From 51923c5998f17a9ee17e0eaf5f958e349d126be9 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Mon, 12 Jul 2021 15:14:13 +0800 Subject: [PATCH 07/14] nit --- .../catalyst/json/JacksonParserSuite.scala | 43 ++++++++++--------- .../datasources/json/JsonSuite.scala | 8 +--- 2 files changed, 23 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala index 0cec57fd3ceaf..371b273c633c8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala @@ -25,6 +25,9 @@ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String class JacksonParserSuite extends SparkFunSuite { + private val missingFieldInput = """{"c1":1}""" + private val nullValueInput = """{"c1": 1, "c2": null}""" + private def check( input: String = """{"i":1, "s": "a"}""", schema: StructType = StructType.fromDDL("i INTEGER"), @@ -37,6 +40,22 @@ class JacksonParserSuite extends SparkFunSuite { assert(actual === expected) } + private def assertAction(nullable: Boolean, input: String)(action: => Unit): Unit = { + if (nullable) { + action + } else { + val msg = intercept[IllegalSchemaArgumentException] { + action + }.message + val expected = if (input == missingFieldInput) { + "field c2 is not nullable but it's missing in one record." + } else { + "field c2 is not nullable but the parsed value is null." + } + assert(msg.contains(expected)) + } + } + test("skipping rows using pushdown filters") { check(filters = Seq(), expected = Seq(InternalRow(1))) check(filters = Seq(EqualTo("i", 1)), expected = Seq(InternalRow(1))) @@ -56,26 +75,7 @@ class JacksonParserSuite extends SparkFunSuite { expected = Seq(InternalRow(1, 3.14))) } - test("35912: nullability with different schema nullable setting") { - val missingFieldInput = """{"c1":1}""" - val nullValueInput = """{"c1": 1, "c2": null}""" - - def assertAction(nullable: Boolean, input: String)(action: => Unit): Unit = { - if (nullable) { - action - } else { - val msg = intercept[IllegalSchemaArgumentException] { - action - }.message - val expected = if (input == missingFieldInput) { - "field c2 is not nullable but it's missing in one record." - } else { - s"field c2 is not nullable but the parsed value is null." - } - assert(msg.contains(expected)) - } - } - + test("SPARK-35912: nullability with different schema nullable setting") { Seq(true, false).foreach { nullable => val schema = StructType(Seq( StructField("c1", IntegerType), @@ -88,8 +88,9 @@ class JacksonParserSuite extends SparkFunSuite { } } } + } - // filter by not exist field and filter by null value field. + test("SPARK-35912: skipping rows with not exist field and null value field") { Seq(true, false).foreach { nullable => val schema = StructType(Seq( StructField("c1", IntegerType), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 9bc845dd7036c..261346df17448 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2924,13 +2924,7 @@ abstract class JsonSuite // JSON field is missing. val missingFieldInput = """{"c1": 1}""" // JSON filed is null. - val nullValueInput = - """ - |{ - | "c1": 1, - | "c2": null - |} - |""".stripMargin + val nullValueInput = """{"c1": 1, "c2": null}""" val load = (mode: String, schema: StructType, inputJson: String) => { val json = spark.createDataset( From 3f3339c8dfb41005baf7ea8749c74e4c61b9ddd5 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Mon, 12 Jul 2021 19:22:14 +0800 Subject: [PATCH 08/14] fix test --- .../test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala index 25b8849d61248..9555317f46bbe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala @@ -182,8 +182,8 @@ class DeprecatedAPISuite extends QueryTest with SharedSparkSession { jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD()) checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil) - schema = StructType(StructField("name", StringType, false) :: - StructField("age", IntegerType, false) :: Nil) + schema = StructType(StructField("name", StringType, true) :: + StructField("age", IntegerType, true) :: Nil) jsonDF = sqlContext.jsonRDD(jsonRDD, schema) checkAnswer(jsonDF, Row("Jack", 20) :: Row("Marry", 18) :: Nil) jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD(), schema) From 4f2e9049312a015079cde379d4143376138b3687 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Tue, 13 Jul 2021 10:06:04 +0800 Subject: [PATCH 09/14] nit --- .../spark/sql/catalyst/util/FailureSafeParser.scala | 8 ++++---- .../spark/sql/execution/datasources/json/JsonSuite.scala | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index 1be0938e8f1b0..d7491e194f1fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -29,7 +29,7 @@ class FailureSafeParser[IN]( schema: StructType, columnNameOfCorruptRecord: String) { - disableNotNullableForPermissiveMode + disableNotNullableForPermissiveMode() private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) private val resultRow = new GenericInternalRow(schema.length) @@ -37,12 +37,12 @@ class FailureSafeParser[IN]( // As PERMISSIVE mode should not fail at runtime, so fail if the mode is PERMISSIVE and schema // contains non-nullable fields. - private def disableNotNullableForPermissiveMode: Unit = { + private def disableNotNullableForPermissiveMode(): Unit = { def checkNotNullableRecursively(schema: StructType): Unit = { schema.fields.foreach { case _ @ StructField(name, _, nullable, _) if (!nullable) => - throw new IllegalSchemaArgumentException(s"field ${name} is not nullable but the " + - "not nullable field is not allowed in PERMISSIVE mode.") + throw new IllegalSchemaArgumentException(s"Field ${name} is not nullable but " + + "PERMISSIVE mode only works with nullable fields.") case _ @ StructField(_, dt: StructType, _, _) => checkNotNullableRecursively(dt) case _ => } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 261346df17448..d16dfc7a7bdc1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2962,7 +2962,7 @@ abstract class JsonSuite load("PERMISSIVE", schema, jsonString).collect } val expectedMsg2 = - "field c2 is not nullable but the not nullable field is not allowed in PERMISSIVE mode." + "Field c2 is not nullable but PERMISSIVE mode only works with nullable fields." assert(exceptionMsg2.getMessage.contains(expectedMsg2)) } } From 0f5354b2212fec4ebe4d218e833f53f29576bfeb Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Wed, 14 Jul 2021 20:28:08 +0800 Subject: [PATCH 10/14] review --- .../sql/catalyst/json/JacksonParser.scala | 23 ++--- .../catalyst/json/JacksonParserSuite.scala | 83 +++++++------------ 2 files changed, 43 insertions(+), 63 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 2dbaa9ab28d9b..5f8ebbdcbb98d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -399,7 +399,6 @@ class JacksonParser( val row = new GenericInternalRow(schema.length) var badRecordException: Option[Throwable] = None var skipRow = false - var checkedIndexSet = Set.empty[Int] structFilters.reset() while (!skipRow && nextUntil(parser, JsonToken.END_OBJECT)) { @@ -413,7 +412,6 @@ class JacksonParser( } row.update(index, fieldValue) skipRow = structFilters.skipRow(row, index) - checkedIndexSet += index } catch { case e: SparkUpgradeException => throw e case e: IllegalSchemaArgumentException => throw e @@ -427,16 +425,19 @@ class JacksonParser( } // When the input schema is setting to `nullable = false`, make sure the field is not null. - var index = 0 - while (badRecordException.isEmpty && !skipRow && index < schema.length) { - if (!schema(index).nullable && row.isNullAt(index)) { - throw new IllegalSchemaArgumentException( - s"field ${schema(index).name} is not nullable but it's missing in one record.") - } - if (!checkedIndexSet.contains(index)) { - skipRow = structFilters.skipRow(row, index) + // As PERMISSIVE mode only works with nullable fields, we can skip this not nullable check when + // the mode is PERMISSIVE. (see FailureSafeParser.disableNotNullableForPermissiveMode) + val checkNotNullable = + badRecordException.isEmpty && !skipRow && options.parseMode != PermissiveMode + if (checkNotNullable) { + var index = 0 + while (index < schema.length) { + if (!schema(index).nullable && row.isNullAt(index)) { + throw new IllegalSchemaArgumentException( + s"field ${schema(index).name} is not nullable but it's missing in one record.") + } + index += 1 } - index += 1 } if (skipRow) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala index 371b273c633c8..e32a03038af6c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala @@ -20,42 +20,25 @@ package org.apache.spark.sql.catalyst.json import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.IllegalSchemaArgumentException -import org.apache.spark.sql.sources.{EqualTo, Filter, IsNotNull, IsNull, StringStartsWith} +import org.apache.spark.sql.sources.{EqualTo, Filter, StringStartsWith} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String class JacksonParserSuite extends SparkFunSuite { - private val missingFieldInput = """{"c1":1}""" - private val nullValueInput = """{"c1": 1, "c2": null}""" private def check( input: String = """{"i":1, "s": "a"}""", schema: StructType = StructType.fromDDL("i INTEGER"), filters: Seq[Filter], + config: Map[String, String] = Map.empty, expected: Seq[InternalRow]): Unit = { - val options = new JSONOptions(Map.empty[String, String], "GMT", "") + val options = new JSONOptions(config, "GMT", "") val parser = new JacksonParser(schema, options, false, filters) val createParser = CreateJacksonParser.string _ val actual = parser.parse(input, createParser, UTF8String.fromString) assert(actual === expected) } - private def assertAction(nullable: Boolean, input: String)(action: => Unit): Unit = { - if (nullable) { - action - } else { - val msg = intercept[IllegalSchemaArgumentException] { - action - }.message - val expected = if (input == missingFieldInput) { - "field c2 is not nullable but it's missing in one record." - } else { - "field c2 is not nullable but the parsed value is null." - } - assert(msg.contains(expected)) - } - } - test("skipping rows using pushdown filters") { check(filters = Seq(), expected = Seq(InternalRow(1))) check(filters = Seq(EqualTo("i", 1)), expected = Seq(InternalRow(1))) @@ -76,42 +59,38 @@ class JacksonParserSuite extends SparkFunSuite { } test("SPARK-35912: nullability with different schema nullable setting") { - Seq(true, false).foreach { nullable => - val schema = StructType(Seq( - StructField("c1", IntegerType), - StructField("c2", IntegerType, nullable = nullable) - )) - val expected = Seq(InternalRow(1, null)) - Seq(missingFieldInput, nullValueInput).foreach { input => - assertAction(nullable, input) { - check(input = input, schema = schema, filters = Seq.empty, expected = expected) + val missingFieldInput = """{"c1":1}""" + val nullValueInput = """{"c1": 1, "c2": null}""" + + def assertAction(nullable: Boolean, input: String)(action: => Unit): Unit = { + if (nullable) { + action + } else { + val msg = intercept[IllegalSchemaArgumentException] { + action + }.message + val expected = if (input == missingFieldInput) { + "field c2 is not nullable but it's missing in one record." + } else { + "field c2 is not nullable but the parsed value is null." } + assert(msg.contains(expected)) } } - } - test("SPARK-35912: skipping rows with not exist field and null value field") { - Seq(true, false).foreach { nullable => - val schema = StructType(Seq( - StructField("c1", IntegerType), - StructField("c2", IntegerType, nullable = nullable) - )) - Seq(missingFieldInput, nullValueInput).foreach { input => - assertAction(nullable, input) { - check(input = input, schema = schema, filters = Seq(EqualTo("c2", 1)), - expected = Seq.empty) - } - assertAction(nullable, input) { - check(input = input, schema = schema, filters = Seq(EqualTo("c2", 0)), - expected = Seq.empty) - } - assertAction(nullable, input) { - check(input = input, schema = schema, filters = Seq(IsNotNull("c2")), - expected = Seq.empty) - } - assertAction(nullable, input) { - check(input = input, schema = schema, filters = Seq(IsNull("c2")), - expected = Seq(InternalRow(1, null))) + Seq("FAILFAST", "DROPMALFORMED").foreach { mode => + val config = Map("mode" -> mode) + Seq(true, false).foreach { nullable => + val schema = StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", IntegerType, nullable = nullable) + )) + val expected = Seq(InternalRow(1, null)) + Seq(missingFieldInput, nullValueInput).foreach { input => + assertAction(nullable, input) { + check(input = input, schema = schema, filters = Seq.empty, config = config, + expected = expected) + } } } } From 9b08de1309df046cda5d3e9f3d7ee838516d6bc3 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Wed, 14 Jul 2021 23:11:48 +0800 Subject: [PATCH 11/14] review --- .../org/apache/spark/sql/catalyst/json/JacksonParser.scala | 2 +- .../apache/spark/sql/catalyst/util/FailureSafeParser.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 5f8ebbdcbb98d..e0e00fb6371fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -426,7 +426,7 @@ class JacksonParser( // When the input schema is setting to `nullable = false`, make sure the field is not null. // As PERMISSIVE mode only works with nullable fields, we can skip this not nullable check when - // the mode is PERMISSIVE. (see FailureSafeParser.disableNotNullableForPermissiveMode) + // the mode is PERMISSIVE. (see FailureSafeParser.checkNullabilityForPermissiveMode) val checkNotNullable = badRecordException.isEmpty && !skipRow && options.parseMode != PermissiveMode if (checkNotNullable) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index d7491e194f1fd..b8a71cc3b04a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -29,7 +29,7 @@ class FailureSafeParser[IN]( schema: StructType, columnNameOfCorruptRecord: String) { - disableNotNullableForPermissiveMode() + checkNullabilityForPermissiveMode() private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) private val resultRow = new GenericInternalRow(schema.length) @@ -37,7 +37,7 @@ class FailureSafeParser[IN]( // As PERMISSIVE mode should not fail at runtime, so fail if the mode is PERMISSIVE and schema // contains non-nullable fields. - private def disableNotNullableForPermissiveMode(): Unit = { + private def checkNullabilityForPermissiveMode(): Unit = { def checkNotNullableRecursively(schema: StructType): Unit = { schema.fields.foreach { case _ @ StructField(name, _, nullable, _) if (!nullable) => @@ -86,7 +86,7 @@ class FailureSafeParser[IN]( case FailFastMode => throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(e) } - case _ if (mode == DropMalformedMode) => + case _: IllegalSchemaArgumentException if (mode == DropMalformedMode) => Iterator.empty } } From 484a17e3928e1c574fd8e208ecc689c3b81b099a Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Thu, 15 Jul 2021 10:26:29 +0800 Subject: [PATCH 12/14] add migration guide --- docs/sql-migration-guide.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index dc9a49e69aa5a..72fc74e87a48d 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -22,6 +22,10 @@ license: | * Table of contents {:toc} +## Upgrading from Spark SQL 3.2 to 3.3 + + - In Spark 3.3, spark will fail when parsing a JSON/CSV string with `PERMISSIVE` mode and schema contains non-nullable fields. You can set mode to `FAILFAST/DROPMALFORMED` if you want to read JSON/CSV with a schema that contains nullable fields. + ## Upgrading from Spark SQL 3.1 to 3.2 - Since Spark 3.2, ADD FILE/JAR/ARCHIVE commands require each path to be enclosed by `"` or `'` if the path contains whitespaces. From 7a6d978934d44f91e37f2cccebaaff1e885f9041 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Fri, 16 Jul 2021 16:09:25 +0800 Subject: [PATCH 13/14] review --- .../sql/catalyst/json/JacksonParser.scala | 41 ++++++++++++------- .../sql/catalyst/util/FailureSafeParser.scala | 2 +- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index e0e00fb6371fe..83cc812d81f92 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -406,7 +406,9 @@ class JacksonParser( case Some(index) => try { val fieldValue = fieldConverters(index).apply(parser) - if (!schema(index).nullable && fieldValue == null) { + val isIllegal = + options.parseMode != PermissiveMode && !schema(index).nullable && fieldValue == null + if (isIllegal) { throw new IllegalSchemaArgumentException( s"field ${schema(index).name} is not nullable but the parsed value is null.") } @@ -425,20 +427,7 @@ class JacksonParser( } // When the input schema is setting to `nullable = false`, make sure the field is not null. - // As PERMISSIVE mode only works with nullable fields, we can skip this not nullable check when - // the mode is PERMISSIVE. (see FailureSafeParser.checkNullabilityForPermissiveMode) - val checkNotNullable = - badRecordException.isEmpty && !skipRow && options.parseMode != PermissiveMode - if (checkNotNullable) { - var index = 0 - while (index < schema.length) { - if (!schema(index).nullable && row.isNullAt(index)) { - throw new IllegalSchemaArgumentException( - s"field ${schema(index).name} is not nullable but it's missing in one record.") - } - index += 1 - } - } + checkNotNullableInRow(row, schema, skipRow, badRecordException) if (skipRow) { None @@ -449,6 +438,28 @@ class JacksonParser( } } + private lazy val checkNotNullableInRow = { + // As PERMISSIVE mode only works with nullable fields, we can skip this not nullable check when + // the mode is PERMISSIVE. (see FailureSafeParser.checkNullabilityForPermissiveMode) + if (options.parseMode != PermissiveMode) { + (row: GenericInternalRow, schema: StructType, skipRow: Boolean, + runtimeExceptionOption: Option[Throwable]) => { + if (runtimeExceptionOption.isEmpty && !skipRow) { + var index = 0 + while (index < schema.length) { + if (!schema(index).nullable && row.isNullAt(index)) { + throw new IllegalSchemaArgumentException( + s"field ${schema(index).name} is not nullable but it's missing in one record.") + } + index += 1 + } + } + } + } else { + (_: GenericInternalRow, _: StructType, _: Boolean, _: Option[Throwable]) => {} + } + } + /** * Parse an object as a Map, preserving all fields. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index b8a71cc3b04a3..008ace454da1a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -86,7 +86,7 @@ class FailureSafeParser[IN]( case FailFastMode => throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(e) } - case _: IllegalSchemaArgumentException if (mode == DropMalformedMode) => + case _: IllegalSchemaArgumentException if mode == DropMalformedMode => Iterator.empty } } From 9fb8fbd5598eb0d80c11bf7e129ad5c9b146f837 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Fri, 16 Jul 2021 16:12:56 +0800 Subject: [PATCH 14/14] nit --- .../sql/catalyst/json/JacksonParser.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 83cc812d81f92..8b6c211e759fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -438,26 +438,26 @@ class JacksonParser( } } - private lazy val checkNotNullableInRow = { - // As PERMISSIVE mode only works with nullable fields, we can skip this not nullable check when - // the mode is PERMISSIVE. (see FailureSafeParser.checkNullabilityForPermissiveMode) - if (options.parseMode != PermissiveMode) { - (row: GenericInternalRow, schema: StructType, skipRow: Boolean, - runtimeExceptionOption: Option[Throwable]) => { - if (runtimeExceptionOption.isEmpty && !skipRow) { - var index = 0 - while (index < schema.length) { - if (!schema(index).nullable && row.isNullAt(index)) { - throw new IllegalSchemaArgumentException( - s"field ${schema(index).name} is not nullable but it's missing in one record.") - } - index += 1 + // As PERMISSIVE mode only works with nullable fields, we can skip this not nullable check when + // the mode is PERMISSIVE. (see FailureSafeParser.checkNullabilityForPermissiveMode) + private lazy val checkNotNullableInRow = if (options.parseMode != PermissiveMode) { + (row: GenericInternalRow, + schema: StructType, + skipRow: Boolean, + runtimeExceptionOption: Option[Throwable]) => { + if (runtimeExceptionOption.isEmpty && !skipRow) { + var index = 0 + while (index < schema.length) { + if (!schema(index).nullable && row.isNullAt(index)) { + throw new IllegalSchemaArgumentException( + s"field ${schema(index).name} is not nullable but it's missing in one record.") } + index += 1 } } - } else { - (_: GenericInternalRow, _: StructType, _: Boolean, _: Option[Throwable]) => {} } + } else { + (_: GenericInternalRow, _: StructType, _: Boolean, _: Option[Throwable]) => {} } /**