-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-53535][SQL] Fix missing structs always being assumed as nulls #52557
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Wow this has been a problem for us for so long, especially when you read non nullable strict this actually throws an NPE instead of just giving you the wrong data. Thanks for the fix! |
juliuszsompolski
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gengliangwang could you take a look?
|
@ZiyaZa I am not a big fan of such behavior changes.
why just picking one arbitrary field, instead of setting all the fields null?
Could you provide more details on this one? |
|
For breaking changes, usually we should introduce a SQL configuration to control the new/legacy behaviors, also we should update https://spark.apache.org/docs/latest/sql-migration-guide.html |
This is a behavior change that is required to fix a correctness issue. The issue is described in more detail in the linked JIRA ticket. According to the comment #52557 (comment) above, we could also get NullPointerException if the struct is marked as nullable, because we would wrongly assume all struct values to be null previously.
We need to understand for each row if the struct value is null or it is a struct with all the fields as null (to explain in JSON notation,
Updated the description.
Added a flag to control this behavior.
Can you please explain how we update that? It looks like that is built from the |
Yes, please create a new section |
| .createWithDefault(true) | ||
|
|
||
| val PARQUET_READ_ANY_FIELD_FOR_MISSING_STRUCT = | ||
| buildConf("spark.sql.parquet.readAnyFieldForMissingStruct") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be a legacy config that is off by default, how about spark.sql.legacy.returnNullStructIfAllFieldsMissing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I named it spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing to show that it is for Parquet files.
...n/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
Outdated
Show resolved
Hide resolved
...n/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
Show resolved
Hide resolved
399f1e7 to
8241b8b
Compare
| ) | ||
| val df = spark.read.schema(readSchema).parquet(file) | ||
| val scanNode = df.queryExecution.executedPlan.collectLeaves().head | ||
| VerifyNoAdditionalScanOutputExec(scanNode).execute().first() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we use the existing ColumnarToRowExec and then verify the rows? But I'm also fine with this custom physical plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to use that first, but couldn't get it to work because ColumnarToRowExec adds its own UnsafeProjection with child.output. Let's keep the custom plan node.
| val childOutputTypes = child.output.map(_.dataType) | ||
| child.executeColumnar().mapPartitionsInternal { batches => | ||
| batches.flatMap { input => | ||
| input.rowIterator().asScala |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we decide to keep this custom physical plan, we can simplify it by checking columnar batches directly:
0.until(input.numCols).foreach { index =>
assert(childOutputTypes(index).dataType == input.column(index).dataType)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Simplified it.
dd53a1b to
54f022c
Compare
|
thanks, merging to master! |
Currently, if all fields of a struct mentioned in the read schema are missing in a Parquet file, the reader populates the struct with nulls.
This PR modifies the scan behavior so that if the struct exists in the Parquet schema but none of the fields from the read schema are present, we instead pick an arbitrary field from the Parquet file to read and use that to populate NULLs (as well as outer NULLs and array sizes if the struct is nested in another nested type).
This is done by changing the schema requested by the readers. We add an additional field to the requested schema when clipping the Parquet file schema according to the Spark schema. This means that the readers actually read and return more data than requested, which can cause problems. This is only a problem for the `VectorizedParquetRecordReader`, since for the other read code path via parquet-mr, we already have an `UnsafeProjection` for outputting only requested schema fields in `ParquetFileFormat`.
To ensure `VectorizedParquetRecordReader` only returns Spark requested fields, we create the `ColumnarBatch` with vectors that match the requested schema (we get rid of the additional fields by recursively matching `sparkSchema` with `sparkRequestedSchema` and ensuring structs have the same length in both). Then `ParquetColumnVector`s are responsible for allocating dummy vectors to hold the data temporarily while reading, but these are not exposed to the outside.
The heuristic to pick the arbitrary leaf field is as follows: We try to minimize the amount of arrays or maps (repeated fields) in the path to a leaf column, because the more repeated fields we have the more likely we are to read larger amount of data. At the same repetition level, we consider the type of each column to pick the cheapest column to read (struct nesting do not affect the decision here). We look at the byte size of the column type to pick the cheapest one as follows:
- BOOLEAN: 1 byte
- INT32, FLOAT: 4 bytes
- INT64, DOUBLE: 8 bytes
- INT96: 12 bytes
- BINARY, FIXED_LEN_BYTE_ARRAY, default case for future types: 32 bytes (high cost due to variable/large size)
This is a bug fix, because we were incorrectly assuming non-null struct values to be missing from the file depending on requested fields and returning null values.
Yes. We previously assumed structs to be null if all the fields we are trying to read from a Parquet file were missing from that file, even if the file contained other fields that could be used to take definition levels from. See an example from the Jira ticket below:
```python
df_a = sql('SELECT 1 as id, named_struct("a", 1) AS s')
path = "/tmp/missing_col_test"
df_a.write.format("parquet").save(path)
df_b = sql('SELECT 2 as id, named_struct("b", 3) AS s')
spark.read.format("parquet").schema(df_b.schema).load(path).show()
```
This used to return:
```
+---+----+
| id| s|
+---+----+
| 1|NULL|
+---+----+
```
It now returns:
```
+---+------+
| id| s|
+---+------+
| 1|{NULL}|
+---+------+
```
Added new unit tests, also fixed an old test to expect this new behavior.
No.
Closes apache#52557 from ZiyaZa/missing_struct.
Authored-by: Ziya Mukhtarov <ziya5muxtarov@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…alid Map ### What changes were proposed in this pull request? This PR fixes a bug from #52557, where we are reading an additional field if all the requested fields of a struct are missing from the Parquet file. We used to always pick the cheapest leaf column of the struct. However, if this leaf was inside a Map column, then we'd generate an invalid Map type like the following: ``` optional group _1 (MAP) { repeated group key_value { required boolean key; } } ``` Since there is no `value` field in this group, we'd fail later when trying to convert this Parquet type to a Spark type. This PR changes the additional field selection logic to enforce selecting a field from both the key and the value of the map, which can now give us a type like following: ``` optional group _1 (MAP) { repeated group key_value { required boolean key; optional group value { optional int32 _2; } } } ``` ### Why are the changes needed? To fix a critical bug where we would throw an exception when reading a Parquet file. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52758 from ZiyaZa/fix-missing-struct-with-map. Authored-by: Ziya Mukhtarov <ziya5muxtarov@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Currently, if all fields of a struct mentioned in the read schema are missing in a Parquet file, the reader populates the struct with nulls.
This PR modifies the scan behavior so that if the struct exists in the Parquet schema but none of the fields from the read schema are present, we instead pick an arbitrary field from the Parquet file to read and use that to populate NULLs (as well as outer NULLs and array sizes if the struct is nested in another nested type).
This is done by changing the schema requested by the readers. We add an additional field to the requested schema when clipping the Parquet file schema according to the Spark schema. This means that the readers actually read and return more data than requested, which can cause problems. This is only a problem for the `VectorizedParquetRecordReader`, since for the other read code path via parquet-mr, we already have an `UnsafeProjection` for outputting only requested schema fields in `ParquetFileFormat`.
To ensure `VectorizedParquetRecordReader` only returns Spark requested fields, we create the `ColumnarBatch` with vectors that match the requested schema (we get rid of the additional fields by recursively matching `sparkSchema` with `sparkRequestedSchema` and ensuring structs have the same length in both). Then `ParquetColumnVector`s are responsible for allocating dummy vectors to hold the data temporarily while reading, but these are not exposed to the outside.
The heuristic to pick the arbitrary leaf field is as follows: We try to minimize the amount of arrays or maps (repeated fields) in the path to a leaf column, because the more repeated fields we have the more likely we are to read larger amount of data. At the same repetition level, we consider the type of each column to pick the cheapest column to read (struct nesting do not affect the decision here). We look at the byte size of the column type to pick the cheapest one as follows:
- BOOLEAN: 1 byte
- INT32, FLOAT: 4 bytes
- INT64, DOUBLE: 8 bytes
- INT96: 12 bytes
- BINARY, FIXED_LEN_BYTE_ARRAY, default case for future types: 32 bytes (high cost due to variable/large size)
### Why are the changes needed?
This is a bug fix, because we were incorrectly assuming non-null struct values to be missing from the file depending on requested fields and returning null values.
### Does this PR introduce _any_ user-facing change?
Yes. We previously assumed structs to be null if all the fields we are trying to read from a Parquet file were missing from that file, even if the file contained other fields that could be used to take definition levels from. See an example from the Jira ticket below:
```python
df_a = sql('SELECT 1 as id, named_struct("a", 1) AS s')
path = "/tmp/missing_col_test"
df_a.write.format("parquet").save(path)
df_b = sql('SELECT 2 as id, named_struct("b", 3) AS s')
spark.read.format("parquet").schema(df_b.schema).load(path).show()
```
This used to return:
```
+---+----+
| id| s|
+---+----+
| 1|NULL|
+---+----+
```
It now returns:
```
+---+------+
| id| s|
+---+------+
| 1|{NULL}|
+---+------+
```
### How was this patch tested?
Added new unit tests, also fixed an old test to expect this new behavior.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes apache#52557 from ZiyaZa/missing_struct.
Authored-by: Ziya Mukhtarov <ziya5muxtarov@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…alid Map ### What changes were proposed in this pull request? This PR fixes a bug from apache#52557, where we are reading an additional field if all the requested fields of a struct are missing from the Parquet file. We used to always pick the cheapest leaf column of the struct. However, if this leaf was inside a Map column, then we'd generate an invalid Map type like the following: ``` optional group _1 (MAP) { repeated group key_value { required boolean key; } } ``` Since there is no `value` field in this group, we'd fail later when trying to convert this Parquet type to a Spark type. This PR changes the additional field selection logic to enforce selecting a field from both the key and the value of the map, which can now give us a type like following: ``` optional group _1 (MAP) { repeated group key_value { required boolean key; optional group value { optional int32 _2; } } } ``` ### Why are the changes needed? To fix a critical bug where we would throw an exception when reading a Parquet file. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52758 from ZiyaZa/fix-missing-struct-with-map. Authored-by: Ziya Mukhtarov <ziya5muxtarov@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…nd `WritableColumnVectorShim` In Java, an overriding method's access modifier cannot be more restrictive than the overridden method. Changing from protected to public is safe and ensures compatibility before the Spark version upgrade. see apache/spark#52557
…ark 4.1 (#11313) * [Fix] Remove `@NotNull` annotations to resolve dependency issues caused by ORC upgrade. apache/spark#51676 * [Fix] Make `reserveNewColumn` public in `ArrowWritableColumnVector` and `WritableColumnVectorShim` In Java, an overriding method's access modifier cannot be more restrictive than the overridden method. Changing from protected to public is safe and ensures compatibility before the Spark version upgrade. see apache/spark#52557 * [Fix] Add GeographyVal and GeometryVal support in ArrowColumnarRow, BatchCarrierRow and ColumnarToCarrierRowExecBase see [SPIP: Add geospatial types in Spark](https://issues.apache.org/jira/browse/SPARK-51658) * [Fix] Update commons-collections to version 4.5.0. see apache/spark#52743 * [Fix] Enable SPARK_TESTING environment variable for Spark test jobs see apache/spark#53344
What changes were proposed in this pull request?
Currently, if all fields of a struct mentioned in the read schema are missing in a Parquet file, the reader populates the struct with nulls.
This PR modifies the scan behavior so that if the struct exists in the Parquet schema but none of the fields from the read schema are present, we instead pick an arbitrary field from the Parquet file to read and use that to populate NULLs (as well as outer NULLs and array sizes if the struct is nested in another nested type).
This is done by changing the schema requested by the readers. We add an additional field to the requested schema when clipping the Parquet file schema according to the Spark schema. This means that the readers actually read and return more data than requested, which can cause problems. This is only a problem for the
VectorizedParquetRecordReader, since for the other read code path via parquet-mr, we already have anUnsafeProjectionfor outputting only requested schema fields inParquetFileFormat.To ensure
VectorizedParquetRecordReaderonly returns Spark requested fields, we create theColumnarBatchwith vectors that match the requested schema (we get rid of the additional fields by recursively matchingsparkSchemawithsparkRequestedSchemaand ensuring structs have the same length in both). ThenParquetColumnVectors are responsible for allocating dummy vectors to hold the data temporarily while reading, but these are not exposed to the outside.The heuristic to pick the arbitrary leaf field is as follows: We try to minimize the amount of arrays or maps (repeated fields) in the path to a leaf column, because the more repeated fields we have the more likely we are to read larger amount of data. At the same repetition level, we consider the type of each column to pick the cheapest column to read (struct nesting do not affect the decision here). We look at the byte size of the column type to pick the cheapest one as follows:
- BOOLEAN: 1 byte
- INT32, FLOAT: 4 bytes
- INT64, DOUBLE: 8 bytes
- INT96: 12 bytes
- BINARY, FIXED_LEN_BYTE_ARRAY, default case for future types: 32 bytes (high cost due to variable/large size)
Why are the changes needed?
This is a bug fix, because we were incorrectly assuming non-null struct values to be missing from the file depending on requested fields and returning null values.
Does this PR introduce any user-facing change?
Yes. We previously assumed structs to be null if all the fields we are trying to read from a Parquet file were missing from that file, even if the file contained other fields that could be used to take definition levels from. See an example from the Jira ticket below:
This used to return:
It now returns:
How was this patch tested?
Added new unit tests, also fixed an old test to expect this new behavior.
Was this patch authored or co-authored using generative AI tooling?
No.