-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Fix parquet test using wrong schema #16133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @xudong963 -- I don't fully understand this PR
I read your comment in #16086 (review) says this test shouldn't pass on main, yet the CI seems to be green.
Does that mean #16086 (review) is no longer needed?
@alamb Sorry for the confusion. The PR #16086 is needed You can see the comment: #16086 (comment), what I mean is that test in #16086 is a bit unreasonable, it's using file schema(utf8) to construct the logical filter expr. We should use table schema to construct logical filter expr. However, if using table schema to construct, the test in PR#16086 will fail because https://github.com/apache/datafusion/pull/16086/files#diff-c8eebe4fb9ee7662170c6ce507ad2be92c0229b3a91a2f4431a9bfc7185a0eb9L148 (uses file_schema to transfer logical expr to physical expr), my PR fixes there, using table schema to transfer logical expr to physical expr. #16086 (review) The review is saying that I used the following test to verify in different branches, the test uses table schema to build filter, to guarantee I can run the test smoothly, I need the PR's fix or it'll fail as I said above. Without #16086, the main will fail as expected, so I use ❌. let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("foo"), Some("bar")]));
let batch = create_batch(vec![("c1", c1.clone())]);
let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]));
// Predicate should prune all row groups
let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string())))); |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much @xudong963
ef82a9c to
829ce16
Compare
| // If testing with page_index_predicate, write parquet | ||
| // files with multiple pages | ||
| let multi_page = self.page_index_predicate; | ||
| let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original code uses batches to write parquet files, which will result in the physical file schema used in the parquet source being the table schema(logical file schema), so the tests in #16086 may be meaningless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a little confused about this -- it makes more sense to me that the batches that are written to the file define the file schema.
When the file is read back, that schema may be different (because the table may have a different schema)
I think the key difference is testing when the table schema is different than the file schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is my aim in the PR
| let multi_page = self.page_index_predicate; | ||
| let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); | ||
| let (meta, _files) = | ||
| store_parquet(file_batches.unwrap_or(batches), multi_page) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, I introduced the file_batches to store parquet, which means the physical file schema used in the parquet source will be the real file schema
|
|
||
| // Predicate should prune all row groups | ||
| let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string())))); | ||
| let filter = col("c1").eq(lit(ScalarValue::Utf8View(Some("aaa".to_string())))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Utf8View to align with table_schema
|
|
||
| // Predicate should prune no row groups | ||
| let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("foo".to_string())))); | ||
| let filter = col("c1").eq(lit(ScalarValue::Utf8View(Some("foo".to_string())))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
|
||
| // Predicate should prune all row groups | ||
| let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5)))); | ||
| let filter = col("c1").eq(lit(ScalarValue::Int8(Some(5)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Int8 to align with table schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change makes sense to me
|
|
||
| // Predicate should prune no row groups | ||
| let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1)))); | ||
| let filter = col("c1").eq(lit(ScalarValue::Int8(Some(1)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this work @xudong963 -- I am confused both about the original tests as well as this new PR 😕
A parquet file has only a single schema -- so I find the tests that write batches with different schemas to the file very confusing. The schema evolution in production happens when the data is read from the files, as you are pointing out.
I think the tests should actually be writing different files when the batches have different schemas rather than merging the schemas together before writing:
datafusion/datafusion/core/src/datasource/physical_plan/parquet.rs
Lines 203 to 212 in ca55f1c
| let file_schema = match &self.schema { | |
| Some(schema) => schema, | |
| None => &Arc::new( | |
| Schema::try_merge( | |
| batches.iter().map(|b| b.schema().as_ref().clone()), | |
| ) | |
| .unwrap(), | |
| ), | |
| }; | |
| let file_schema = Arc::clone(file_schema); |
It would then be fine on the read side to configure the same ParquetExec to read multiple files with different schemas.
| // If testing with page_index_predicate, write parquet | ||
| // files with multiple pages | ||
| let multi_page = self.page_index_predicate; | ||
| let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a little confused about this -- it makes more sense to me that the batches that are written to the file define the file schema.
When the file is read back, that schema may be different (because the table may have a different schema)
I think the key difference is testing when the table schema is different than the file schema
|
|
||
| // Predicate should prune all row groups | ||
| let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5)))); | ||
| let filter = col("c1").eq(lit(ScalarValue::Int8(Some(5)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change makes sense to me
I don't think this PR reflects the intent of the tests
|
Let me see if I can come up with something different |
I spent some time reviewing the tests and I think they are using the correct schema. I made a PR to try and clarify the comments |
Which issue does this PR close?
When I review the PR: #16086, I found the test of parquet using the wrong schema to build predicates.
Rationale for this change
Using table schema to transfer logical expr to physical expr.
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?