From b1d5d4fa9904f245bb029eed7320160092dbd44c Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 21 May 2025 18:17:57 +0800 Subject: [PATCH 1/3] Fix parquet test using wrong schema --- .../src/datasource/physical_plan/parquet.rs | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index fb4eb13db1cf..42ef22a627f5 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -146,12 +146,16 @@ mod tests { self.round_trip(batches).await.batches } - fn build_file_source(&self, file_schema: SchemaRef) -> Arc { + fn build_file_source( + &self, + table_schema: SchemaRef, + file_schema: SchemaRef, + ) -> Arc { // set up predicate (this is normally done by a layer higher up) let predicate = self .predicate .as_ref() - .map(|p| logical2physical(p, &file_schema)); + .map(|p| logical2physical(p, &table_schema)); let mut source = ParquetSource::default(); if let Some(predicate) = predicate { @@ -200,14 +204,17 @@ mod tests { /// run the test, returning the `RoundTripResult` async fn round_trip(&self, batches: Vec) -> RoundTripResult { - let file_schema = match &self.schema { - Some(schema) => schema, - None => &Arc::new( + let table_schema = self.schema.clone().unwrap_or_else(|| { + Arc::new( Schema::try_merge( batches.iter().map(|b| b.schema().as_ref().clone()), ) .unwrap(), - ), + ) + }); + let file_schema = match &self.schema { + Some(schema) => schema, + None => &table_schema, }; let file_schema = Arc::clone(file_schema); // If testing with page_index_predicate, write parquet @@ -217,7 +224,8 @@ mod tests { let file_group: FileGroup = meta.into_iter().map(Into::into).collect(); // build a ParquetExec to return the results - let parquet_source = self.build_file_source(file_schema.clone()); + let parquet_source = + self.build_file_source(table_schema.clone(), file_schema.clone()); let parquet_exec = self.build_parquet_exec( file_schema.clone(), file_group.clone(), @@ -231,7 +239,7 @@ mod tests { self.build_parquet_exec( file_schema.clone(), file_group.clone(), - self.build_file_source(file_schema.clone()), + self.build_file_source(table_schema.clone(), file_schema.clone()), ), Arc::new(Schema::new(vec![ Field::new("plan_type", DataType::Utf8, true), From 850dc957b304fe5c881f5003040ed92f9598150a Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 21 May 2025 21:05:54 +0800 Subject: [PATCH 2/3] Fix tests --- .../src/datasource/physical_plan/parquet.rs | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 42ef22a627f5..9b2eb4b7389e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -38,8 +38,8 @@ mod tests { use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use crate::test::object_store::local_unpartitioned_file; use arrow::array::{ - ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, StringArray, - StringViewArray, StructArray, + Array, ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, + StringArray, StringViewArray, StructArray, }; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder}; use arrow::record_batch::RecordBatch; @@ -204,14 +204,10 @@ mod tests { /// run the test, returning the `RoundTripResult` async fn round_trip(&self, batches: Vec) -> RoundTripResult { - let table_schema = self.schema.clone().unwrap_or_else(|| { - Arc::new( - Schema::try_merge( - batches.iter().map(|b| b.schema().as_ref().clone()), - ) + let table_schema = Arc::new( + Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone())) .unwrap(), - ) - }); + ); let file_schema = match &self.schema { Some(schema) => schema, None => &table_schema, @@ -882,7 +878,7 @@ mod tests { let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)])); // Predicate should prune all row groups - let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string())))); + let filter = col("c1").eq(lit(ScalarValue::Utf8View(Some("aaa".to_string())))); let rt = RoundTrip::new() .with_predicate(filter) .with_schema(schema.clone()) @@ -895,7 +891,7 @@ mod tests { assert_eq!(rt.batches.unwrap().len(), 0); // Predicate should prune no row groups - let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("foo".to_string())))); + let filter = col("c1").eq(lit(ScalarValue::Utf8View(Some("foo".to_string())))); let rt = RoundTrip::new() .with_predicate(filter) .with_schema(schema) @@ -924,7 +920,7 @@ mod tests { Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, false)])); // Predicate should prune all row groups - let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5)))); + let filter = col("c1").eq(lit(ScalarValue::Int8(Some(5)))); let rt = RoundTrip::new() .with_predicate(filter) .with_schema(schema.clone()) @@ -936,7 +932,7 @@ mod tests { assert_eq!(rt.batches.unwrap().len(), 0); // Predicate should prune no row groups - let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1)))); + let filter = col("c1").eq(lit(ScalarValue::Int8(Some(1)))); let rt = RoundTrip::new() .with_predicate(filter) .with_schema(schema) From 676e30f86fb94272a73d01c92b0b37457ca43b69 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Thu, 22 May 2025 16:57:14 +0800 Subject: [PATCH 3/3] fix more --- .../src/datasource/physical_plan/parquet.rs | 158 +++++++++++------- 1 file changed, 102 insertions(+), 56 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 9b2eb4b7389e..a580f3b5d053 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -39,7 +39,7 @@ mod tests { use crate::test::object_store::local_unpartitioned_file; use arrow::array::{ Array, ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, - StringArray, StringViewArray, StructArray, + StringArray, StructArray, UInt64Array, }; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder}; use arrow::record_batch::RecordBatch; @@ -96,7 +96,8 @@ mod tests { #[derive(Debug, Default)] struct RoundTrip { projection: Option>, - schema: Option, + physical_file_schema: Option, + logical_file_schema: Option, predicate: Option, pushdown_predicate: bool, page_index_predicate: bool, @@ -113,8 +114,13 @@ mod tests { self } - fn with_schema(mut self, schema: SchemaRef) -> Self { - self.schema = Some(schema); + fn with_physical_file_schema(mut self, physical_file_schema: SchemaRef) -> Self { + self.physical_file_schema = Some(physical_file_schema); + self + } + + fn with_logical_file_schema(mut self, logical_field_schema: SchemaRef) -> Self { + self.logical_file_schema = Some(logical_field_schema); self } @@ -187,13 +193,13 @@ mod tests { fn build_parquet_exec( &self, - file_schema: SchemaRef, + logical_file_schema: SchemaRef, file_group: FileGroup, source: Arc, ) -> Arc { let base_config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), - file_schema, + logical_file_schema, // It'll be used to initialize `logical_file_schema` in source source, ) .with_file_group(file_group) @@ -204,26 +210,43 @@ mod tests { /// run the test, returning the `RoundTripResult` async fn round_trip(&self, batches: Vec) -> RoundTripResult { - let table_schema = Arc::new( - Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone())) - .unwrap(), - ); - let file_schema = match &self.schema { + self.round_trip_with_file_batches(batches, None).await + } + + /// run the test, returning the `RoundTripResult` + /// If your table schema is different from file schema, you may need to specify the `file_batches` with the file schema + /// Or the file schema in the parquet source will be table schema, see `store_parquet` for detail + async fn round_trip_with_file_batches( + &self, + batches: Vec, + file_batches: Option>, + ) -> RoundTripResult { + let batches_schema = + Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone())); + let file_schema = match &self.physical_file_schema { Some(schema) => schema, - None => &table_schema, + None => &Arc::new(batches_schema.as_ref().unwrap().clone()), }; let file_schema = Arc::clone(file_schema); + let table_schema = match &self.logical_file_schema { + Some(schema) => schema, + None => &Arc::new(batches_schema.as_ref().unwrap().clone()), + }; + // If testing with page_index_predicate, write parquet // files with multiple pages let multi_page = self.page_index_predicate; - let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); + let (meta, _files) = + store_parquet(file_batches.unwrap_or(batches), multi_page) + .await + .unwrap(); let file_group: FileGroup = meta.into_iter().map(Into::into).collect(); // build a ParquetExec to return the results let parquet_source = self.build_file_source(table_schema.clone(), file_schema.clone()); let parquet_exec = self.build_parquet_exec( - file_schema.clone(), + table_schema.clone(), file_group.clone(), Arc::clone(&parquet_source), ); @@ -233,7 +256,7 @@ mod tests { false, // use a new ParquetSource to avoid sharing execution metrics self.build_parquet_exec( - file_schema.clone(), + table_schema.clone(), file_group.clone(), self.build_file_source(table_schema.clone(), file_schema.clone()), ), @@ -301,17 +324,18 @@ mod tests { Field::new("c2", DataType::Int32, true), ])); - let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap(); + let file_batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap(); // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, // the default behavior is to fill in missing columns with nulls. // Thus this predicate will come back as false. let filter = col("c2").eq(lit(1_i32)); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema.clone()) + .with_logical_file_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch.clone()])) .await; let total_rows = rt .batches @@ -327,10 +351,11 @@ mod tests { // If we excplicitly allow nulls the rest of the predicate should work let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32))); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema) + .with_logical_file_schema(table_schema) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch])) .await; let batches = rt.batches.unwrap(); @@ -359,17 +384,18 @@ mod tests { Field::new("c2", DataType::Utf8, true), ])); - let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap(); + let file_batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap(); // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, // the default behavior is to fill in missing columns with nulls. // Thus this predicate will come back as false. let filter = col("c2").eq(lit("abc")); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema.clone()) + .with_logical_file_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch.clone()])) .await; let total_rows = rt .batches @@ -385,10 +411,11 @@ mod tests { // If we excplicitly allow nulls the rest of the predicate should work let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32))); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema) + .with_logical_file_schema(table_schema) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch])) .await; let batches = rt.batches.unwrap(); @@ -421,17 +448,18 @@ mod tests { Field::new("c3", DataType::Int32, true), ])); - let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap(); + let file_batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap(); // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, // the default behavior is to fill in missing columns with nulls. // Thus this predicate will come back as false. let filter = col("c2").eq(lit("abc")); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema.clone()) + .with_logical_file_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch.clone()])) .await; let total_rows = rt .batches @@ -447,10 +475,11 @@ mod tests { // If we excplicitly allow nulls the rest of the predicate should work let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32))); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema) + .with_logical_file_schema(table_schema) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch])) .await; let batches = rt.batches.unwrap(); @@ -482,7 +511,7 @@ mod tests { Field::new("c3", DataType::Int32, true), ])); - let batch = + let file_batch = RecordBatch::try_new(file_schema.clone(), vec![c3.clone(), c3]).unwrap(); // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, @@ -490,10 +519,11 @@ mod tests { // Thus this predicate will come back as false. let filter = col("c2").eq(lit("abc")); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema.clone()) + .with_logical_file_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch.clone()])) .await; let total_rows = rt .batches @@ -509,10 +539,11 @@ mod tests { // If we excplicitly allow nulls the rest of the predicate should work let filter = col("c2").is_null().and(col("c3").eq(lit(7_i32))); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema) + .with_logical_file_schema(table_schema) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch])) .await; let batches = rt.batches.unwrap(); @@ -546,7 +577,7 @@ mod tests { Field::new("c3", DataType::Int32, true), ])); - let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap(); + let file_batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap(); // Test with complex nested AND/OR: // (c1 = 1 OR c2 = 5) AND (c3 = 10 OR c2 IS NULL) @@ -557,10 +588,11 @@ mod tests { .and(col("c3").eq(lit(10_i32)).or(col("c2").is_null())); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema.clone()) + .with_logical_file_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch.clone()])) .await; let batches = rt.batches.unwrap(); @@ -587,10 +619,11 @@ mod tests { .or(col("c3").gt(lit(20_i32)).and(col("c2").is_null())); let rt = RoundTrip::new() - .with_schema(table_schema) + .with_physical_file_schema(file_schema) + .with_logical_file_schema(table_schema) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch])) .await; let batches = rt.batches.unwrap(); @@ -871,18 +904,25 @@ mod tests { #[tokio::test] async fn evolved_schema_column_type_filter_strings() { // The table and filter have a common data type, but the file schema differs - let c1: ArrayRef = - Arc::new(StringViewArray::from(vec![Some("foo"), Some("bar")])); - let batch = create_batch(vec![("c1", c1.clone())]); + let table_schema = Arc::new(Schema::new(vec![Field::new( + "c1", + DataType::Utf8View, + false, + )])); - let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)])); + let file_schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)])); + let file_c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("foo"), Some("bar")])); + let file_batch = create_batch(vec![("c1", file_c1.clone())]); // Predicate should prune all row groups let filter = col("c1").eq(lit(ScalarValue::Utf8View(Some("aaa".to_string())))); let rt = RoundTrip::new() .with_predicate(filter) - .with_schema(schema.clone()) - .round_trip(vec![batch.clone()]) + .with_physical_file_schema(file_schema.clone()) + .with_logical_file_schema(table_schema.clone()) + .round_trip_with_file_batches(vec![], Some(vec![file_batch.clone()])) .await; // There should be no predicate evaluation errors let metrics = rt.parquet_exec.metrics().unwrap(); @@ -894,8 +934,9 @@ mod tests { let filter = col("c1").eq(lit(ScalarValue::Utf8View(Some("foo".to_string())))); let rt = RoundTrip::new() .with_predicate(filter) - .with_schema(schema) - .round_trip(vec![batch]) + .with_physical_file_schema(file_schema) + .with_logical_file_schema(table_schema) + .round_trip_with_file_batches(vec![], Some(vec![file_batch])) .await; // There should be no predicate evaluation errors let metrics = rt.parquet_exec.metrics().unwrap(); @@ -913,18 +954,21 @@ mod tests { #[tokio::test] async fn evolved_schema_column_type_filter_ints() { // The table and filter have a common data type, but the file schema differs - let c1: ArrayRef = Arc::new(Int8Array::from(vec![Some(1), Some(2)])); - let batch = create_batch(vec![("c1", c1.clone())]); + let table_schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Int8, false)])); - let schema = + let file_schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, false)])); + let file_c1: ArrayRef = Arc::new(UInt64Array::from(vec![Some(1), Some(2)])); + let file_batch = create_batch(vec![("c1", file_c1.clone())]); // Predicate should prune all row groups let filter = col("c1").eq(lit(ScalarValue::Int8(Some(5)))); let rt = RoundTrip::new() .with_predicate(filter) - .with_schema(schema.clone()) - .round_trip(vec![batch.clone()]) + .with_physical_file_schema(file_schema.clone()) + .with_logical_file_schema(table_schema.clone()) + .round_trip_with_file_batches(vec![], Some(vec![file_batch.clone()])) .await; // There should be no predicate evaluation errors let metrics = rt.parquet_exec.metrics().unwrap(); @@ -935,8 +979,9 @@ mod tests { let filter = col("c1").eq(lit(ScalarValue::Int8(Some(1)))); let rt = RoundTrip::new() .with_predicate(filter) - .with_schema(schema) - .round_trip(vec![batch]) + .with_physical_file_schema(file_schema) + .with_logical_file_schema(table_schema) + .round_trip_with_file_batches(vec![], Some(vec![file_batch])) .await; // There should be no predicate evaluation errors let metrics = rt.parquet_exec.metrics().unwrap(); @@ -1195,7 +1240,8 @@ mod tests { // read/write them files: let read = RoundTrip::new() - .with_schema(Arc::new(schema)) + .with_physical_file_schema(Arc::new(schema.clone())) + .with_logical_file_schema(Arc::new(schema)) .round_trip_to_batches(vec![batch1, batch2]) .await; assert_contains!(read.unwrap_err().to_string(),