diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index fb4eb13db1cf8..a580f3b5d053c 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -38,8 +38,8 @@ mod tests { use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use crate::test::object_store::local_unpartitioned_file; use arrow::array::{ - ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, StringArray, - StringViewArray, StructArray, + Array, ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, + StringArray, StructArray, UInt64Array, }; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder}; use arrow::record_batch::RecordBatch; @@ -96,7 +96,8 @@ mod tests { #[derive(Debug, Default)] struct RoundTrip { projection: Option>, - schema: Option, + physical_file_schema: Option, + logical_file_schema: Option, predicate: Option, pushdown_predicate: bool, page_index_predicate: bool, @@ -113,8 +114,13 @@ mod tests { self } - fn with_schema(mut self, schema: SchemaRef) -> Self { - self.schema = Some(schema); + fn with_physical_file_schema(mut self, physical_file_schema: SchemaRef) -> Self { + self.physical_file_schema = Some(physical_file_schema); + self + } + + fn with_logical_file_schema(mut self, logical_field_schema: SchemaRef) -> Self { + self.logical_file_schema = Some(logical_field_schema); self } @@ -146,12 +152,16 @@ mod tests { self.round_trip(batches).await.batches } - fn build_file_source(&self, file_schema: SchemaRef) -> Arc { + fn build_file_source( + &self, + table_schema: SchemaRef, + file_schema: SchemaRef, + ) -> Arc { // set up predicate (this is normally done by a layer higher up) let predicate = self .predicate .as_ref() - .map(|p| logical2physical(p, &file_schema)); + .map(|p| logical2physical(p, &table_schema)); let mut source = ParquetSource::default(); if let Some(predicate) = predicate { @@ -183,13 +193,13 @@ mod tests { fn build_parquet_exec( &self, - file_schema: SchemaRef, + logical_file_schema: SchemaRef, file_group: FileGroup, source: Arc, ) -> Arc { let base_config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), - file_schema, + logical_file_schema, // It'll be used to initialize `logical_file_schema` in source source, ) .with_file_group(file_group) @@ -200,26 +210,43 @@ mod tests { /// run the test, returning the `RoundTripResult` async fn round_trip(&self, batches: Vec) -> RoundTripResult { - let file_schema = match &self.schema { + self.round_trip_with_file_batches(batches, None).await + } + + /// run the test, returning the `RoundTripResult` + /// If your table schema is different from file schema, you may need to specify the `file_batches` with the file schema + /// Or the file schema in the parquet source will be table schema, see `store_parquet` for detail + async fn round_trip_with_file_batches( + &self, + batches: Vec, + file_batches: Option>, + ) -> RoundTripResult { + let batches_schema = + Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone())); + let file_schema = match &self.physical_file_schema { Some(schema) => schema, - None => &Arc::new( - Schema::try_merge( - batches.iter().map(|b| b.schema().as_ref().clone()), - ) - .unwrap(), - ), + None => &Arc::new(batches_schema.as_ref().unwrap().clone()), }; let file_schema = Arc::clone(file_schema); + let table_schema = match &self.logical_file_schema { + Some(schema) => schema, + None => &Arc::new(batches_schema.as_ref().unwrap().clone()), + }; + // If testing with page_index_predicate, write parquet // files with multiple pages let multi_page = self.page_index_predicate; - let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); + let (meta, _files) = + store_parquet(file_batches.unwrap_or(batches), multi_page) + .await + .unwrap(); let file_group: FileGroup = meta.into_iter().map(Into::into).collect(); // build a ParquetExec to return the results - let parquet_source = self.build_file_source(file_schema.clone()); + let parquet_source = + self.build_file_source(table_schema.clone(), file_schema.clone()); let parquet_exec = self.build_parquet_exec( - file_schema.clone(), + table_schema.clone(), file_group.clone(), Arc::clone(&parquet_source), ); @@ -229,9 +256,9 @@ mod tests { false, // use a new ParquetSource to avoid sharing execution metrics self.build_parquet_exec( - file_schema.clone(), + table_schema.clone(), file_group.clone(), - self.build_file_source(file_schema.clone()), + self.build_file_source(table_schema.clone(), file_schema.clone()), ), Arc::new(Schema::new(vec![ Field::new("plan_type", DataType::Utf8, true), @@ -297,17 +324,18 @@ mod tests { Field::new("c2", DataType::Int32, true), ])); - let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap(); + let file_batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap(); // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, // the default behavior is to fill in missing columns with nulls. // Thus this predicate will come back as false. let filter = col("c2").eq(lit(1_i32)); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema.clone()) + .with_logical_file_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch.clone()])) .await; let total_rows = rt .batches @@ -323,10 +351,11 @@ mod tests { // If we excplicitly allow nulls the rest of the predicate should work let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32))); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema) + .with_logical_file_schema(table_schema) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch])) .await; let batches = rt.batches.unwrap(); @@ -355,17 +384,18 @@ mod tests { Field::new("c2", DataType::Utf8, true), ])); - let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap(); + let file_batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap(); // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, // the default behavior is to fill in missing columns with nulls. // Thus this predicate will come back as false. let filter = col("c2").eq(lit("abc")); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema.clone()) + .with_logical_file_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch.clone()])) .await; let total_rows = rt .batches @@ -381,10 +411,11 @@ mod tests { // If we excplicitly allow nulls the rest of the predicate should work let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32))); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema) + .with_logical_file_schema(table_schema) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch])) .await; let batches = rt.batches.unwrap(); @@ -417,17 +448,18 @@ mod tests { Field::new("c3", DataType::Int32, true), ])); - let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap(); + let file_batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap(); // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, // the default behavior is to fill in missing columns with nulls. // Thus this predicate will come back as false. let filter = col("c2").eq(lit("abc")); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema.clone()) + .with_logical_file_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch.clone()])) .await; let total_rows = rt .batches @@ -443,10 +475,11 @@ mod tests { // If we excplicitly allow nulls the rest of the predicate should work let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32))); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema) + .with_logical_file_schema(table_schema) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch])) .await; let batches = rt.batches.unwrap(); @@ -478,7 +511,7 @@ mod tests { Field::new("c3", DataType::Int32, true), ])); - let batch = + let file_batch = RecordBatch::try_new(file_schema.clone(), vec![c3.clone(), c3]).unwrap(); // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, @@ -486,10 +519,11 @@ mod tests { // Thus this predicate will come back as false. let filter = col("c2").eq(lit("abc")); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema.clone()) + .with_logical_file_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch.clone()])) .await; let total_rows = rt .batches @@ -505,10 +539,11 @@ mod tests { // If we excplicitly allow nulls the rest of the predicate should work let filter = col("c2").is_null().and(col("c3").eq(lit(7_i32))); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema) + .with_logical_file_schema(table_schema) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch])) .await; let batches = rt.batches.unwrap(); @@ -542,7 +577,7 @@ mod tests { Field::new("c3", DataType::Int32, true), ])); - let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap(); + let file_batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap(); // Test with complex nested AND/OR: // (c1 = 1 OR c2 = 5) AND (c3 = 10 OR c2 IS NULL) @@ -553,10 +588,11 @@ mod tests { .and(col("c3").eq(lit(10_i32)).or(col("c2").is_null())); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_physical_file_schema(file_schema.clone()) + .with_logical_file_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch.clone()]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch.clone()])) .await; let batches = rt.batches.unwrap(); @@ -583,10 +619,11 @@ mod tests { .or(col("c3").gt(lit(20_i32)).and(col("c2").is_null())); let rt = RoundTrip::new() - .with_schema(table_schema) + .with_physical_file_schema(file_schema) + .with_logical_file_schema(table_schema) .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip(vec![batch]) + .round_trip_with_file_batches(vec![], Some(vec![file_batch])) .await; let batches = rt.batches.unwrap(); @@ -867,18 +904,25 @@ mod tests { #[tokio::test] async fn evolved_schema_column_type_filter_strings() { // The table and filter have a common data type, but the file schema differs - let c1: ArrayRef = - Arc::new(StringViewArray::from(vec![Some("foo"), Some("bar")])); - let batch = create_batch(vec![("c1", c1.clone())]); + let table_schema = Arc::new(Schema::new(vec![Field::new( + "c1", + DataType::Utf8View, + false, + )])); - let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)])); + let file_schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)])); + let file_c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("foo"), Some("bar")])); + let file_batch = create_batch(vec![("c1", file_c1.clone())]); // Predicate should prune all row groups - let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string())))); + let filter = col("c1").eq(lit(ScalarValue::Utf8View(Some("aaa".to_string())))); let rt = RoundTrip::new() .with_predicate(filter) - .with_schema(schema.clone()) - .round_trip(vec![batch.clone()]) + .with_physical_file_schema(file_schema.clone()) + .with_logical_file_schema(table_schema.clone()) + .round_trip_with_file_batches(vec![], Some(vec![file_batch.clone()])) .await; // There should be no predicate evaluation errors let metrics = rt.parquet_exec.metrics().unwrap(); @@ -887,11 +931,12 @@ mod tests { assert_eq!(rt.batches.unwrap().len(), 0); // Predicate should prune no row groups - let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("foo".to_string())))); + let filter = col("c1").eq(lit(ScalarValue::Utf8View(Some("foo".to_string())))); let rt = RoundTrip::new() .with_predicate(filter) - .with_schema(schema) - .round_trip(vec![batch]) + .with_physical_file_schema(file_schema) + .with_logical_file_schema(table_schema) + .round_trip_with_file_batches(vec![], Some(vec![file_batch])) .await; // There should be no predicate evaluation errors let metrics = rt.parquet_exec.metrics().unwrap(); @@ -909,18 +954,21 @@ mod tests { #[tokio::test] async fn evolved_schema_column_type_filter_ints() { // The table and filter have a common data type, but the file schema differs - let c1: ArrayRef = Arc::new(Int8Array::from(vec![Some(1), Some(2)])); - let batch = create_batch(vec![("c1", c1.clone())]); + let table_schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Int8, false)])); - let schema = + let file_schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, false)])); + let file_c1: ArrayRef = Arc::new(UInt64Array::from(vec![Some(1), Some(2)])); + let file_batch = create_batch(vec![("c1", file_c1.clone())]); // Predicate should prune all row groups - let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5)))); + let filter = col("c1").eq(lit(ScalarValue::Int8(Some(5)))); let rt = RoundTrip::new() .with_predicate(filter) - .with_schema(schema.clone()) - .round_trip(vec![batch.clone()]) + .with_physical_file_schema(file_schema.clone()) + .with_logical_file_schema(table_schema.clone()) + .round_trip_with_file_batches(vec![], Some(vec![file_batch.clone()])) .await; // There should be no predicate evaluation errors let metrics = rt.parquet_exec.metrics().unwrap(); @@ -928,11 +976,12 @@ mod tests { assert_eq!(rt.batches.unwrap().len(), 0); // Predicate should prune no row groups - let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1)))); + let filter = col("c1").eq(lit(ScalarValue::Int8(Some(1)))); let rt = RoundTrip::new() .with_predicate(filter) - .with_schema(schema) - .round_trip(vec![batch]) + .with_physical_file_schema(file_schema) + .with_logical_file_schema(table_schema) + .round_trip_with_file_batches(vec![], Some(vec![file_batch])) .await; // There should be no predicate evaluation errors let metrics = rt.parquet_exec.metrics().unwrap(); @@ -1191,7 +1240,8 @@ mod tests { // read/write them files: let read = RoundTrip::new() - .with_schema(Arc::new(schema)) + .with_physical_file_schema(Arc::new(schema.clone())) + .with_logical_file_schema(Arc::new(schema)) .round_trip_to_batches(vec![batch1, batch2]) .await; assert_contains!(read.unwrap_err().to_string(),