From f81559b63b3a6a4099644b68d94313a233643d48 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 17 Jun 2025 13:39:07 -0500 Subject: [PATCH 01/26] wip --- datafusion/datasource-parquet/src/opener.rs | 62 ++++++++++++++++++++- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 285044803d73c..6d9b30f44fef1 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -25,6 +25,8 @@ use crate::{ apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, }; +use arrow::compute::can_cast_types; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; @@ -35,7 +37,7 @@ use datafusion_common::pruning::{ CompositePruningStatistics, PartitionPruningStatistics, PrunableStatistics, PruningStatistics, }; -use datafusion_common::{exec_err, Result}; +use datafusion_common::{exec_err, Result, ScalarValue}; use datafusion_datasource::PartitionedFile; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::pruning::PruningPredicate; @@ -248,10 +250,16 @@ impl FileOpener for ParquetOpener { } } + let predicate = predicate + .map(|p| { + cast_expr_to_schema(p, &physical_file_schema, &logical_file_schema) + }) + .transpose()?; + // Build predicates for this specific file let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates( predicate.as_ref(), - &logical_file_schema, + &physical_file_schema, &predicate_creation_errors, ); @@ -524,6 +532,56 @@ fn should_enable_page_index( .unwrap_or(false) } +use datafusion_physical_expr::expressions; + +/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new [`PhysicalExpr`] that +/// is cast to the specified data type. +/// Preference is always given to casting literal values to the data type of the column +/// since casting the column to the literal value's data type can be significantly more expensive. +/// Given two columns the cast is applied arbitrarily to the first column. +pub fn cast_expr_to_schema( + expr: Arc, + physical_file_schema: &Schema, + logical_file_schema: &Schema, +) -> Result> { + expr.transform(|expr| { + if let Some(column) = expr.as_any().downcast_ref::() { + let logical_field = logical_file_schema.field_with_name(column.name())?; + let Ok(physical_field) = physical_file_schema.field_with_name(column.name()) + else { + // If the column is missing from the physical schema fill it in with nulls as `SchemaAdapter` would do. + let value = ScalarValue::Null.cast_to(logical_field.data_type())?; + return Ok(Transformed::yes(expressions::lit(value))); + }; + + if logical_field.data_type() == physical_field.data_type() { + return Ok(Transformed::no(expr)); + } + + // If the logical field and physical field are different, we need to cast + // the column to the logical field's data type. + // We will try later to move the cast to literal values if possible, which is computationally cheaper. + if !can_cast_types(logical_field.data_type(), physical_field.data_type()) { + return exec_err!( + "Cannot cast column '{}' from '{}' to '{}'", + column.name(), + logical_field.data_type(), + physical_field.data_type() + ); + } + let casted_expr = Arc::new(expressions::CastExpr::new( + expr, + logical_field.data_type().clone(), + None, + )); + return Ok(Transformed::yes(casted_expr)); + } + + Ok(Transformed::no(expr)) + }) + .data() +} + #[cfg(test)] mod test { use std::sync::Arc; From 30ebecc7f869d39702e0b3371df5d6fd570fc3ac Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 19 Jun 2025 13:13:23 -0500 Subject: [PATCH 02/26] adapt filter expressions to file schema during parquet scan --- datafusion/datasource-parquet/src/opener.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 6d9b30f44fef1..a8624c6d56e57 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -549,6 +549,12 @@ pub fn cast_expr_to_schema( let logical_field = logical_file_schema.field_with_name(column.name())?; let Ok(physical_field) = physical_file_schema.field_with_name(column.name()) else { + if !logical_field.is_nullable() { + return exec_err!( + "Non-nullable column '{}' is missing from the physical schema", + column.name() + ); + } // If the column is missing from the physical schema fill it in with nulls as `SchemaAdapter` would do. let value = ScalarValue::Null.cast_to(logical_field.data_type())?; return Ok(Transformed::yes(expressions::lit(value))); From c3b825bdd525d23499fd83d9dd7b4b8e6f3a816e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 19 Jun 2025 13:41:58 -0500 Subject: [PATCH 03/26] handle partition values --- datafusion/datasource-parquet/src/opener.rs | 32 +++++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index a8624c6d56e57..bd51ff1e7ea44 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -161,7 +161,7 @@ impl FileOpener for ParquetOpener { if let Some(pruning_predicate) = pruning_predicate { // The partition column schema is the schema of the table - the schema of the file let mut pruning = Box::new(PartitionPruningStatistics::try_new( - vec![file.partition_values], + vec![file.partition_values.clone()], partition_fields.clone(), )?) as Box; @@ -252,7 +252,14 @@ impl FileOpener for ParquetOpener { let predicate = predicate .map(|p| { - cast_expr_to_schema(p, &physical_file_schema, &logical_file_schema) + cast_expr_to_schema( + p, + &physical_file_schema, + &logical_file_schema, + file.partition_values, + &partition_fields, + ) + .map_err(ArrowError::from) }) .transpose()?; @@ -543,10 +550,29 @@ pub fn cast_expr_to_schema( expr: Arc, physical_file_schema: &Schema, logical_file_schema: &Schema, + partition_values: Vec, + partition_fields: &[FieldRef], ) -> Result> { expr.transform(|expr| { if let Some(column) = expr.as_any().downcast_ref::() { - let logical_field = logical_file_schema.field_with_name(column.name())?; + let logical_field = match logical_file_schema.field_with_name(column.name()) { + Ok(field) => field, + Err(e) => { + // Is this a partition field? + for (partition_field, partition_value) in + partition_fields.iter().zip(partition_values.iter()) + { + if partition_field.name() == column.name() { + // If the column is a partition field, we can use the partition value + return Ok(Transformed::yes(expressions::lit( + partition_value.clone(), + ))); + } + } + // If the column is not found in the logical schema, return an error + return Err(e.into()); + } + }; let Ok(physical_field) = physical_file_schema.field_with_name(column.name()) else { if !logical_field.is_nullable() { From 636fc7f402709ae7e6fcb9e22a075535f7ad9b6d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 19 Jun 2025 13:47:18 -0500 Subject: [PATCH 04/26] add more comments --- datafusion/datasource-parquet/src/opener.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bd51ff1e7ea44..09a596b23af87 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -558,18 +558,19 @@ pub fn cast_expr_to_schema( let logical_field = match logical_file_schema.field_with_name(column.name()) { Ok(field) => field, Err(e) => { - // Is this a partition field? + // If the column is a partition field, we can use the partition value for (partition_field, partition_value) in partition_fields.iter().zip(partition_values.iter()) { if partition_field.name() == column.name() { - // If the column is a partition field, we can use the partition value return Ok(Transformed::yes(expressions::lit( partition_value.clone(), ))); } } // If the column is not found in the logical schema, return an error + // This should probably never be hit unless something upstream broke, but nontheless it's better + // for us to return a handleable error than to panic / do something unexpected. return Err(e.into()); } }; @@ -582,6 +583,8 @@ pub fn cast_expr_to_schema( ); } // If the column is missing from the physical schema fill it in with nulls as `SchemaAdapter` would do. + // TODO: do we need to sync this with what the `SchemaAdapter` actually does? + // While the default implementation fills in nulls in theory a custom `SchemaAdapter` could do something else! let value = ScalarValue::Null.cast_to(logical_field.data_type())?; return Ok(Transformed::yes(expressions::lit(value))); }; @@ -595,7 +598,7 @@ pub fn cast_expr_to_schema( // We will try later to move the cast to literal values if possible, which is computationally cheaper. if !can_cast_types(logical_field.data_type(), physical_field.data_type()) { return exec_err!( - "Cannot cast column '{}' from '{}' to '{}'", + "Cannot cast column '{}' from '{}' (file data type) to '{}' (table data type)", column.name(), logical_field.data_type(), physical_field.data_type() From 4cee1e9425c27ad1790b82a27ef5518b3e7fa830 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 19 Jun 2025 13:53:54 -0500 Subject: [PATCH 05/26] add a new test --- datafusion/datasource-parquet/src/opener.rs | 94 +++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 09a596b23af87..22951c8e23b11 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -972,4 +972,98 @@ mod test { assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); } + + #[tokio::test] + async fn test_prune_on_partition_value_and_data_value() { + let store = Arc::new(InMemory::new()) as Arc; + + // Note: number 3 is missing! + let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(4)])).unwrap(); + let data_size = + write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await; + + let file_schema = batch.schema(); + let mut file = PartitionedFile::new( + "part=1/file.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + file.partition_values = vec![ScalarValue::Int32(Some(1))]; + + let table_schema = Arc::new(Schema::new(vec![ + Field::new("part", DataType::Int32, false), + Field::new("a", DataType::Int32, false), + ])); + + let make_opener = |predicate| { + ParquetOpener { + partition_index: 0, + projection: Arc::new([0]), + batch_size: 1024, + limit: None, + predicate: Some(predicate), + logical_file_schema: file_schema.clone(), + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + parquet_file_reader_factory: Arc::new( + DefaultParquetFileReaderFactory::new(Arc::clone(&store)), + ), + partition_fields: vec![Arc::new(Field::new( + "part", + DataType::Int32, + false, + ))], + pushdown_filters: true, // note that this is true! + reorder_filters: true, + enable_page_index: false, + enable_bloom_filter: false, + schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + enable_row_group_stats_pruning: false, // note that this is false! + coerce_int96: None, + } + }; + + let make_meta = || FileMeta { + object_meta: ObjectMeta { + location: Path::from("part=1/file.parquet"), + last_modified: Utc::now(), + size: u64::try_from(data_size).unwrap(), + e_tag: None, + version: None, + }, + range: None, + extensions: None, + metadata_size_hint: None, + }; + + // Filter should match the partition value and data value + let expr = col("part").eq(lit(1)).and(col("a").eq(lit(1))); + let predicate = logical2physical(&expr, &table_schema); + let opener = make_opener(predicate); + let stream = opener + .open(make_meta(), file.clone()) + .unwrap() + .await + .unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_batches, 1); + assert_eq!(num_rows, 1); + + // Filter should match the partition value but not the data value + let expr = col("part").eq(lit(1)).and(col("a").eq(lit(3))); + let predicate = logical2physical(&expr, &table_schema); + let opener = make_opener(predicate); + let stream = opener.open(make_meta(), file.clone()).unwrap().await.unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_batches, 0); + assert_eq!(num_rows, 0); + + // Filter should not match the partition value but match the data value + let expr = col("part").eq(lit(2)).and(col("a").eq(lit(1))); + let predicate = logical2physical(&expr, &table_schema); + let opener = make_opener(predicate); + let stream = opener.open(make_meta(), file.clone()).unwrap().await.unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_batches, 0); + assert_eq!(num_rows, 0); + } } From 35b306fc6e22ca2584662851dd2cff85ed0e7243 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 19 Jun 2025 13:58:25 -0500 Subject: [PATCH 06/26] better test? --- datafusion/datasource-parquet/src/opener.rs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 22951c8e23b11..ffff424c6d54b 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1036,7 +1036,7 @@ mod test { }; // Filter should match the partition value and data value - let expr = col("part").eq(lit(1)).and(col("a").eq(lit(1))); + let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); let stream = opener @@ -1046,23 +1046,32 @@ mod test { .unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); - assert_eq!(num_rows, 1); + assert_eq!(num_rows, 3); // Filter should match the partition value but not the data value - let expr = col("part").eq(lit(1)).and(col("a").eq(lit(3))); + let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); let stream = opener.open(make_meta(), file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; - assert_eq!(num_batches, 0); - assert_eq!(num_rows, 0); + assert_eq!(num_batches, 1); + assert_eq!(num_rows, 3); // Filter should not match the partition value but match the data value - let expr = col("part").eq(lit(2)).and(col("a").eq(lit(1))); + let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); let stream = opener.open(make_meta(), file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_batches, 1); + assert_eq!(num_rows, 1); + + // Filter should not match the partition value or the data value + let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3))); + let predicate = logical2physical(&expr, &table_schema); + let opener = make_opener(predicate); + let stream = opener.open(make_meta(), file).unwrap().await.unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); } From b38b9ca63b7353391312b7633f2d6791855256ae Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 19 Jun 2025 14:19:03 -0500 Subject: [PATCH 07/26] fmt --- datafusion/datasource-parquet/src/opener.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index ffff424c6d54b..16e86110ede45 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1052,7 +1052,11 @@ mod test { let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(make_meta(), file.clone()).unwrap().await.unwrap(); + let stream = opener + .open(make_meta(), file.clone()) + .unwrap() + .await + .unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -1061,7 +1065,11 @@ mod test { let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(make_meta(), file.clone()).unwrap().await.unwrap(); + let stream = opener + .open(make_meta(), file.clone()) + .unwrap() + .await + .unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 1); From 06813c21f6d43287f4163632a7dd2a83a61e9694 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 19 Jun 2025 23:55:53 -0500 Subject: [PATCH 08/26] remove schema adapters --- datafusion/datasource-parquet/src/opener.rs | 6 +- .../datasource-parquet/src/row_filter.rs | 132 +----------------- 2 files changed, 8 insertions(+), 130 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 16e86110ede45..c584b1ae060eb 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -39,6 +39,7 @@ use datafusion_common::pruning::{ }; use datafusion_common::{exec_err, Result, ScalarValue}; use datafusion_datasource::PartitionedFile; +use datafusion_physical_expr::utils::reassign_predicate_columns; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::pruning::PruningPredicate; use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder}; @@ -119,7 +120,6 @@ impl FileOpener for ParquetOpener { let projected_schema = SchemaRef::from(self.logical_file_schema.project(&self.projection)?); - let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory); let schema_adapter = self .schema_adapter_factory .create(projected_schema, Arc::clone(&self.logical_file_schema)); @@ -260,7 +260,9 @@ impl FileOpener for ParquetOpener { &partition_fields, ) .map_err(ArrowError::from) + .map(|p| reassign_predicate_columns(p, &physical_file_schema, false)) }) + .transpose()? .transpose()?; // Build predicates for this specific file @@ -303,11 +305,9 @@ impl FileOpener for ParquetOpener { let row_filter = row_filter::build_row_filter( &predicate, &physical_file_schema, - &logical_file_schema, builder.metadata(), reorder_predicates, &file_metrics, - &schema_adapter_factory, ); match row_filter { diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index db455fed61606..9dac0a89b4898 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -67,6 +67,7 @@ use arrow::array::BooleanArray; use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; +use itertools::Itertools; use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; use parquet::arrow::ProjectionMask; use parquet::file::metadata::ParquetMetaData; @@ -74,9 +75,8 @@ use parquet::file::metadata::ParquetMetaData; use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::Result; -use datafusion_datasource::schema_adapter::{SchemaAdapterFactory, SchemaMapper}; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::utils::reassign_predicate_columns; +use datafusion_physical_expr::utils::{collect_columns, reassign_predicate_columns}; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; use datafusion_physical_plan::metrics; @@ -106,8 +106,6 @@ pub(crate) struct DatafusionArrowPredicate { rows_matched: metrics::Count, /// how long was spent evaluating this predicate time: metrics::Time, - /// used to perform type coercion while filtering rows - schema_mapper: Arc, } impl DatafusionArrowPredicate { @@ -132,7 +130,6 @@ impl DatafusionArrowPredicate { rows_pruned, rows_matched, time, - schema_mapper: candidate.schema_mapper, }) } } @@ -143,8 +140,6 @@ impl ArrowPredicate for DatafusionArrowPredicate { } fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult { - let batch = self.schema_mapper.map_batch(batch)?; - // scoped timer updates on drop let mut timer = self.time.timer(); @@ -187,9 +182,6 @@ pub(crate) struct FilterCandidate { /// required to pass thorugh a `SchemaMapper` to the table schema /// upon which we then evaluate the filter expression. projection: Vec, - /// A `SchemaMapper` used to map batches read from the file schema to - /// the filter's projection of the table schema. - schema_mapper: Arc, /// The projected table schema that this filter references filter_schema: SchemaRef, } @@ -230,25 +222,16 @@ struct FilterCandidateBuilder { /// columns in the file schema that are not in the table schema or columns that /// are in the table schema that are not in the file schema. file_schema: SchemaRef, - /// The schema of the table (merged schema) -- columns may be in different - /// order than in the file and have columns that are not in the file schema - table_schema: SchemaRef, - /// A `SchemaAdapterFactory` used to map the file schema to the table schema. - schema_adapter_factory: Arc, } impl FilterCandidateBuilder { pub fn new( expr: Arc, file_schema: Arc, - table_schema: Arc, - schema_adapter_factory: Arc, ) -> Self { Self { expr, file_schema, - table_schema, - schema_adapter_factory, } } @@ -261,20 +244,17 @@ impl FilterCandidateBuilder { /// * `Err(e)` if an error occurs while building the candidate pub fn build(self, metadata: &ParquetMetaData) -> Result> { let Some(required_indices_into_table_schema) = - pushdown_columns(&self.expr, &self.table_schema)? + pushdown_columns(&self.expr, &self.file_schema)? else { return Ok(None); }; let projected_table_schema = Arc::new( - self.table_schema + self.file_schema .project(&required_indices_into_table_schema)?, ); - let (schema_mapper, projection_into_file_schema) = self - .schema_adapter_factory - .create(Arc::clone(&projected_table_schema), self.table_schema) - .map_schema(&self.file_schema)?; + let projection_into_file_schema = collect_columns(&self.expr).iter().map(|c| c.index()).sorted_unstable().collect_vec(); let required_bytes = size_of_columns(&projection_into_file_schema, metadata)?; let can_use_index = columns_sorted(&projection_into_file_schema, metadata)?; @@ -284,7 +264,6 @@ impl FilterCandidateBuilder { required_bytes, can_use_index, projection: projection_into_file_schema, - schema_mapper: Arc::clone(&schema_mapper), filter_schema: Arc::clone(&projected_table_schema), })) } @@ -426,11 +405,9 @@ fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result, physical_file_schema: &SchemaRef, - logical_file_schema: &SchemaRef, metadata: &ParquetMetaData, reorder_predicates: bool, file_metrics: &ParquetFileMetrics, - schema_adapter_factory: &Arc, ) -> Result> { let rows_pruned = &file_metrics.pushdown_rows_pruned; let rows_matched = &file_metrics.pushdown_rows_matched; @@ -447,8 +424,6 @@ pub fn build_row_filter( FilterCandidateBuilder::new( Arc::clone(expr), Arc::clone(physical_file_schema), - Arc::clone(logical_file_schema), - Arc::clone(schema_adapter_factory), ) .build(metadata) }) @@ -492,13 +467,9 @@ mod test { use super::*; use datafusion_common::ScalarValue; - use arrow::datatypes::{Field, TimeUnit::Nanosecond}; - use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; use datafusion_expr::{col, Expr}; use datafusion_physical_expr::planner::logical2physical; - use datafusion_physical_plan::metrics::{Count, Time}; - use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::parquet_to_arrow_schema; use parquet::file::reader::{FileReader, SerializedFileReader}; @@ -520,14 +491,11 @@ mod test { let expr = col("int64_list").is_not_null(); let expr = logical2physical(&expr, &table_schema); - let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); let table_schema = Arc::new(table_schema.clone()); let candidate = FilterCandidateBuilder::new( expr, table_schema.clone(), - table_schema, - schema_adapter_factory, ) .build(metadata) .expect("building candidate"); @@ -535,96 +503,6 @@ mod test { assert!(candidate.is_none()); } - #[test] - fn test_filter_type_coercion() { - let testdata = datafusion_common::test_util::parquet_test_data(); - let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet")) - .expect("opening file"); - - let parquet_reader_builder = - ParquetRecordBatchReaderBuilder::try_new(file).expect("creating reader"); - let metadata = parquet_reader_builder.metadata().clone(); - let file_schema = parquet_reader_builder.schema().clone(); - - // This is the schema we would like to coerce to, - // which is different from the physical schema of the file. - let table_schema = Schema::new(vec![Field::new( - "timestamp_col", - DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))), - false, - )]); - - // Test all should fail - let expr = col("timestamp_col").lt(Expr::Literal( - ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))), - None, - )); - let expr = logical2physical(&expr, &table_schema); - let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); - let table_schema = Arc::new(table_schema.clone()); - let candidate = FilterCandidateBuilder::new( - expr, - file_schema.clone(), - table_schema.clone(), - schema_adapter_factory, - ) - .build(&metadata) - .expect("building candidate") - .expect("candidate expected"); - - let mut row_filter = DatafusionArrowPredicate::try_new( - candidate, - &metadata, - Count::new(), - Count::new(), - Time::new(), - ) - .expect("creating filter predicate"); - - let mut parquet_reader = parquet_reader_builder - .with_projection(row_filter.projection().clone()) - .build() - .expect("building reader"); - - // Parquet file is small, we only need 1 record batch - let first_rb = parquet_reader - .next() - .expect("expected record batch") - .expect("expected error free record batch"); - - let filtered = row_filter.evaluate(first_rb.clone()); - assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![false; 8]))); - - // Test all should pass - let expr = col("timestamp_col").gt(Expr::Literal( - ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))), - None, - )); - let expr = logical2physical(&expr, &table_schema); - let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); - let candidate = FilterCandidateBuilder::new( - expr, - file_schema, - table_schema, - schema_adapter_factory, - ) - .build(&metadata) - .expect("building candidate") - .expect("candidate expected"); - - let mut row_filter = DatafusionArrowPredicate::try_new( - candidate, - &metadata, - Count::new(), - Count::new(), - Time::new(), - ) - .expect("creating filter predicate"); - - let filtered = row_filter.evaluate(first_rb); - assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true; 8]))); - } - #[test] fn nested_data_structures_prevent_pushdown() { let table_schema = Arc::new(get_lists_table_schema()); From ce64271bc918cacb729ad78a3b7302f0e5f790c3 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 19 Jun 2025 23:56:02 -0500 Subject: [PATCH 09/26] fmt --- .../datasource-parquet/src/row_filter.rs | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 9dac0a89b4898..5626f83186e31 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -225,14 +225,8 @@ struct FilterCandidateBuilder { } impl FilterCandidateBuilder { - pub fn new( - expr: Arc, - file_schema: Arc, - ) -> Self { - Self { - expr, - file_schema, - } + pub fn new(expr: Arc, file_schema: Arc) -> Self { + Self { expr, file_schema } } /// Attempt to build a `FilterCandidate` from the expression @@ -254,7 +248,11 @@ impl FilterCandidateBuilder { .project(&required_indices_into_table_schema)?, ); - let projection_into_file_schema = collect_columns(&self.expr).iter().map(|c| c.index()).sorted_unstable().collect_vec(); + let projection_into_file_schema = collect_columns(&self.expr) + .iter() + .map(|c| c.index()) + .sorted_unstable() + .collect_vec(); let required_bytes = size_of_columns(&projection_into_file_schema, metadata)?; let can_use_index = columns_sorted(&projection_into_file_schema, metadata)?; @@ -493,12 +491,9 @@ mod test { let table_schema = Arc::new(table_schema.clone()); - let candidate = FilterCandidateBuilder::new( - expr, - table_schema.clone(), - ) - .build(metadata) - .expect("building candidate"); + let candidate = FilterCandidateBuilder::new(expr, table_schema.clone()) + .build(metadata) + .expect("building candidate"); assert!(candidate.is_none()); } From 403a98b440fc2645f79c6254ba72e1f683751e61 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 21 Jun 2025 10:41:46 -0500 Subject: [PATCH 10/26] address PR feedback --- datafusion/datasource-parquet/src/opener.rs | 69 +--- datafusion/physical-expr/src/lib.rs | 2 + .../physical-expr/src/schema_rewriter.rs | 295 ++++++++++++++++++ 3 files changed, 302 insertions(+), 64 deletions(-) create mode 100644 datafusion/physical-expr/src/schema_rewriter.rs diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index c584b1ae060eb..2911adedca050 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -25,8 +25,6 @@ use crate::{ apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, }; -use arrow::compute::can_cast_types; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; @@ -539,7 +537,7 @@ fn should_enable_page_index( .unwrap_or(false) } -use datafusion_physical_expr::expressions; +use datafusion_physical_expr::PhysicalExprSchemaRewriter; /// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new [`PhysicalExpr`] that /// is cast to the specified data type. @@ -553,68 +551,11 @@ pub fn cast_expr_to_schema( partition_values: Vec, partition_fields: &[FieldRef], ) -> Result> { - expr.transform(|expr| { - if let Some(column) = expr.as_any().downcast_ref::() { - let logical_field = match logical_file_schema.field_with_name(column.name()) { - Ok(field) => field, - Err(e) => { - // If the column is a partition field, we can use the partition value - for (partition_field, partition_value) in - partition_fields.iter().zip(partition_values.iter()) - { - if partition_field.name() == column.name() { - return Ok(Transformed::yes(expressions::lit( - partition_value.clone(), - ))); - } - } - // If the column is not found in the logical schema, return an error - // This should probably never be hit unless something upstream broke, but nontheless it's better - // for us to return a handleable error than to panic / do something unexpected. - return Err(e.into()); - } - }; - let Ok(physical_field) = physical_file_schema.field_with_name(column.name()) - else { - if !logical_field.is_nullable() { - return exec_err!( - "Non-nullable column '{}' is missing from the physical schema", - column.name() - ); - } - // If the column is missing from the physical schema fill it in with nulls as `SchemaAdapter` would do. - // TODO: do we need to sync this with what the `SchemaAdapter` actually does? - // While the default implementation fills in nulls in theory a custom `SchemaAdapter` could do something else! - let value = ScalarValue::Null.cast_to(logical_field.data_type())?; - return Ok(Transformed::yes(expressions::lit(value))); - }; - - if logical_field.data_type() == physical_field.data_type() { - return Ok(Transformed::no(expr)); - } - - // If the logical field and physical field are different, we need to cast - // the column to the logical field's data type. - // We will try later to move the cast to literal values if possible, which is computationally cheaper. - if !can_cast_types(logical_field.data_type(), physical_field.data_type()) { - return exec_err!( - "Cannot cast column '{}' from '{}' (file data type) to '{}' (table data type)", - column.name(), - logical_field.data_type(), - physical_field.data_type() - ); - } - let casted_expr = Arc::new(expressions::CastExpr::new( - expr, - logical_field.data_type().clone(), - None, - )); - return Ok(Transformed::yes(casted_expr)); - } + let rewriter = + PhysicalExprSchemaRewriter::new(physical_file_schema, logical_file_schema) + .with_partition_columns(partition_fields.to_vec(), partition_values); - Ok(Transformed::no(expr)) - }) - .data() + rewriter.rewrite(expr) } #[cfg(test)] diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index be60e26cc2d2a..3bdb9d84d8278 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -38,6 +38,7 @@ mod partitioning; mod physical_expr; pub mod planner; mod scalar_function; +pub mod schema_rewriter; pub mod statistics; pub mod utils; pub mod window; @@ -68,6 +69,7 @@ pub use datafusion_physical_expr_common::sort_expr::{ pub use planner::{create_physical_expr, create_physical_exprs}; pub use scalar_function::ScalarFunctionExpr; +pub use schema_rewriter::PhysicalExprSchemaRewriter; pub use utils::{conjunction, conjunction_opt, split_conjunction}; // For backwards compatibility diff --git a/datafusion/physical-expr/src/schema_rewriter.rs b/datafusion/physical-expr/src/schema_rewriter.rs new file mode 100644 index 0000000000000..7d51042e134b9 --- /dev/null +++ b/datafusion/physical-expr/src/schema_rewriter.rs @@ -0,0 +1,295 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Physical expression schema rewriting utilities + +use std::sync::Arc; + +use arrow::compute::can_cast_types; +use arrow::datatypes::{FieldRef, Schema}; +use datafusion_common::{ + exec_err, + tree_node::{Transformed, TransformedResult, TreeNode}, + Result, ScalarValue, +}; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + +use crate::expressions::{self, CastExpr, Column}; + +/// Builder for rewriting physical expressions to match different schemas. +/// +/// # Example +/// +/// ```rust +/// use datafusion_physical_expr::schema_rewriter::PhysicalExprSchemaRewriter; +/// use arrow::datatypes::Schema; +/// +/// # fn example( +/// # predicate: std::sync::Arc, +/// # physical_file_schema: &Schema, +/// # logical_file_schema: &Schema, +/// # ) -> datafusion_common::Result<()> { +/// let rewriter = PhysicalExprSchemaRewriter::new(physical_file_schema, logical_file_schema); +/// let adapted_predicate = rewriter.rewrite(predicate)?; +/// # Ok(()) +/// # } +/// ``` +pub struct PhysicalExprSchemaRewriter<'a> { + physical_file_schema: &'a Schema, + logical_file_schema: &'a Schema, + partition_fields: Vec, + partition_values: Vec, +} + +impl<'a> PhysicalExprSchemaRewriter<'a> { + /// Create a new schema rewriter with the given schemas + pub fn new( + physical_file_schema: &'a Schema, + logical_file_schema: &'a Schema, + ) -> Self { + Self { + physical_file_schema, + logical_file_schema, + partition_fields: Vec::new(), + partition_values: Vec::new(), + } + } + + /// Add partition columns and their corresponding values + /// + /// When a column reference matches a partition field, it will be replaced + /// with the corresponding literal value from partition_values. + pub fn with_partition_columns( + mut self, + partition_fields: Vec, + partition_values: Vec, + ) -> Self { + self.partition_fields = partition_fields; + self.partition_values = partition_values; + self + } + + /// Rewrite the given physical expression to match the target schema + /// + /// This method applies the following transformations: + /// 1. Replaces partition column references with literal values + /// 2. Handles missing columns by inserting null literals + /// 3. Casts columns when logical and physical schemas have different types + pub fn rewrite(&self, expr: Arc) -> Result> { + expr.transform(|expr| self.rewrite_expr(expr)).data() + } + + fn rewrite_expr( + &self, + expr: Arc, + ) -> Result>> { + if let Some(column) = expr.as_any().downcast_ref::() { + return self.rewrite_column(Arc::clone(&expr), column); + } + + Ok(Transformed::no(expr)) + } + + fn rewrite_column( + &self, + expr: Arc, + column: &Column, + ) -> Result>> { + // Check if this is a partition column + if let Some(partition_value) = self.get_partition_value(column.name()) { + return Ok(Transformed::yes(expressions::lit(partition_value))); + } + + // Get the logical field for this column + let logical_field = match self.logical_file_schema.field_with_name(column.name()) + { + Ok(field) => field, + Err(e) => { + return Err(e.into()); + } + }; + + // Check if the column exists in the physical schema + let physical_field = + match self.physical_file_schema.field_with_name(column.name()) { + Ok(field) => field, + Err(_) => { + // Column is missing from physical schema + if !logical_field.is_nullable() { + return exec_err!( + "Non-nullable column '{}' is missing from the physical schema", + column.name() + ); + } + // Fill in with null value + let null_value = + ScalarValue::Null.cast_to(logical_field.data_type())?; + return Ok(Transformed::yes(expressions::lit(null_value))); + } + }; + + // Check if casting is needed + if logical_field.data_type() == physical_field.data_type() { + return Ok(Transformed::no(expr)); + } + + // Perform type casting + if !can_cast_types(physical_field.data_type(), logical_field.data_type()) { + return exec_err!( + "Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type)", + column.name(), + physical_field.data_type(), + logical_field.data_type() + ); + } + + let cast_expr = + Arc::new(CastExpr::new(expr, logical_field.data_type().clone(), None)); + + Ok(Transformed::yes(cast_expr)) + } + + fn get_partition_value(&self, column_name: &str) -> Option { + self.partition_fields + .iter() + .zip(self.partition_values.iter()) + .find(|(field, _)| field.name() == column_name) + .map(|(_, value)| value.clone()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::ScalarValue; + use std::sync::Arc; + + fn create_test_schema() -> (Schema, Schema) { + let physical_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + ]); + + let logical_schema = Schema::new(vec![ + Field::new("a", DataType::Int64, false), // Different type + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), // Missing from physical + ]); + + (physical_schema, logical_schema) + } + + #[test] + fn test_rewrite_column_with_type_cast() -> Result<()> { + let (physical_schema, logical_schema) = create_test_schema(); + + let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema); + let column_expr = Arc::new(Column::new("a", 0)); + + let result = rewriter.rewrite(column_expr)?; + + // Should be wrapped in a cast expression + assert!(result.as_any().downcast_ref::().is_some()); + + Ok(()) + } + + #[test] + fn test_rewrite_missing_column() -> Result<()> { + let (physical_schema, logical_schema) = create_test_schema(); + + let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema); + let column_expr = Arc::new(Column::new("c", 2)); + + let result = rewriter.rewrite(column_expr)?; + + // Should be replaced with a literal null + if let Some(literal) = result.as_any().downcast_ref::() { + assert_eq!(*literal.value(), ScalarValue::Float64(None)); + } else { + panic!("Expected literal expression"); + } + + Ok(()) + } + + #[test] + fn test_rewrite_partition_column() -> Result<()> { + let (physical_schema, logical_schema) = create_test_schema(); + + let partition_fields = + vec![Arc::new(Field::new("partition_col", DataType::Utf8, false))]; + let partition_values = vec![ScalarValue::Utf8(Some("test_value".to_string()))]; + + let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema) + .with_partition_columns(partition_fields, partition_values); + + let column_expr = Arc::new(Column::new("partition_col", 0)); + let result = rewriter.rewrite(column_expr)?; + + // Should be replaced with the partition value + if let Some(literal) = result.as_any().downcast_ref::() { + assert_eq!( + *literal.value(), + ScalarValue::Utf8(Some("test_value".to_string())) + ); + } else { + panic!("Expected literal expression"); + } + + Ok(()) + } + + #[test] + fn test_rewrite_no_change_needed() -> Result<()> { + let (physical_schema, logical_schema) = create_test_schema(); + + let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema); + let column_expr = Arc::new(Column::new("b", 1)); + + let result = rewriter.rewrite(column_expr.clone())?; + + // Should be the same expression (no transformation needed) + // We compare the underlying pointer through the trait object + assert!(std::ptr::eq( + column_expr.as_ref() as *const dyn PhysicalExpr, + result.as_ref() as *const dyn PhysicalExpr + )); + + Ok(()) + } + + #[test] + fn test_non_nullable_missing_column_error() { + let physical_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let logical_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), // Non-nullable missing column + ]); + + let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema); + let column_expr = Arc::new(Column::new("b", 1)); + + let result = rewriter.rewrite(column_expr); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Non-nullable column 'b' is missing")); + } +} From 15b030fedd2f8e4612c163188e46c9f4d1463be0 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 21 Jun 2025 11:04:28 -0500 Subject: [PATCH 11/26] cleanup --- datafusion/datasource-parquet/src/opener.rs | 36 ++++----------------- 1 file changed, 7 insertions(+), 29 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 2911adedca050..7b0ad9ab90ce5 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -35,9 +35,10 @@ use datafusion_common::pruning::{ CompositePruningStatistics, PartitionPruningStatistics, PrunableStatistics, PruningStatistics, }; -use datafusion_common::{exec_err, Result, ScalarValue}; +use datafusion_common::{exec_err, Result}; use datafusion_datasource::PartitionedFile; use datafusion_physical_expr::utils::reassign_predicate_columns; +use datafusion_physical_expr::PhysicalExprSchemaRewriter; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::pruning::PruningPredicate; use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder}; @@ -250,13 +251,11 @@ impl FileOpener for ParquetOpener { let predicate = predicate .map(|p| { - cast_expr_to_schema( - p, - &physical_file_schema, - &logical_file_schema, - file.partition_values, - &partition_fields, - ) + let rewriter = + PhysicalExprSchemaRewriter::new(&physical_file_schema, &logical_file_schema) + .with_partition_columns(partition_fields.to_vec(), file.partition_values); + + rewriter.rewrite(p) .map_err(ArrowError::from) .map(|p| reassign_predicate_columns(p, &physical_file_schema, false)) }) @@ -537,27 +536,6 @@ fn should_enable_page_index( .unwrap_or(false) } -use datafusion_physical_expr::PhysicalExprSchemaRewriter; - -/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new [`PhysicalExpr`] that -/// is cast to the specified data type. -/// Preference is always given to casting literal values to the data type of the column -/// since casting the column to the literal value's data type can be significantly more expensive. -/// Given two columns the cast is applied arbitrarily to the first column. -pub fn cast_expr_to_schema( - expr: Arc, - physical_file_schema: &Schema, - logical_file_schema: &Schema, - partition_values: Vec, - partition_fields: &[FieldRef], -) -> Result> { - let rewriter = - PhysicalExprSchemaRewriter::new(physical_file_schema, logical_file_schema) - .with_partition_columns(partition_fields.to_vec(), partition_values); - - rewriter.rewrite(expr) -} - #[cfg(test)] mod test { use std::sync::Arc; From 8f5f150ea30f4f74b08ce91bbd32641f904c605c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 21 Jun 2025 11:05:26 -0500 Subject: [PATCH 12/26] remove unecessary reassign --- datafusion/datasource-parquet/src/opener.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 7b0ad9ab90ce5..b41090305222d 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -37,7 +37,6 @@ use datafusion_common::pruning::{ }; use datafusion_common::{exec_err, Result}; use datafusion_datasource::PartitionedFile; -use datafusion_physical_expr::utils::reassign_predicate_columns; use datafusion_physical_expr::PhysicalExprSchemaRewriter; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::pruning::PruningPredicate; @@ -257,9 +256,7 @@ impl FileOpener for ParquetOpener { rewriter.rewrite(p) .map_err(ArrowError::from) - .map(|p| reassign_predicate_columns(p, &physical_file_schema, false)) }) - .transpose()? .transpose()?; // Build predicates for this specific file From b1d3b0cfd74cdd3d3cdbd81c71f0f9322ec91302 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 21 Jun 2025 11:06:08 -0500 Subject: [PATCH 13/26] fmt --- datafusion/datasource-parquet/src/opener.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index b41090305222d..9ea917b56a0be 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -250,12 +250,16 @@ impl FileOpener for ParquetOpener { let predicate = predicate .map(|p| { - let rewriter = - PhysicalExprSchemaRewriter::new(&physical_file_schema, &logical_file_schema) - .with_partition_columns(partition_fields.to_vec(), file.partition_values); + let rewriter = PhysicalExprSchemaRewriter::new( + &physical_file_schema, + &logical_file_schema, + ) + .with_partition_columns( + partition_fields.to_vec(), + file.partition_values, + ); - rewriter.rewrite(p) - .map_err(ArrowError::from) + rewriter.rewrite(p).map_err(ArrowError::from) }) .transpose()?; From a0dcae9309a6c286f045f3aa1a02f0a198252a7e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 21 Jun 2025 11:10:11 -0500 Subject: [PATCH 14/26] better comments --- .../physical-expr/src/schema_rewriter.rs | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-expr/src/schema_rewriter.rs b/datafusion/physical-expr/src/schema_rewriter.rs index 7d51042e134b9..ba058587680e6 100644 --- a/datafusion/physical-expr/src/schema_rewriter.rs +++ b/datafusion/physical-expr/src/schema_rewriter.rs @@ -109,16 +109,18 @@ impl<'a> PhysicalExprSchemaRewriter<'a> { expr: Arc, column: &Column, ) -> Result>> { - // Check if this is a partition column - if let Some(partition_value) = self.get_partition_value(column.name()) { - return Ok(Transformed::yes(expressions::lit(partition_value))); - } - // Get the logical field for this column let logical_field = match self.logical_file_schema.field_with_name(column.name()) { Ok(field) => field, Err(e) => { + // If the column is a partition field, we can use the partition value + if let Some(partition_value) = self.get_partition_value(column.name()) { + return Ok(Transformed::yes(expressions::lit(partition_value))); + } + // If the column is not found in the logical schema and is not a partition value, return an error + // This should probably never be hit unless something upstream broke, but nontheless it's better + // for us to return a handleable error than to panic / do something unexpected. return Err(e.into()); } }; @@ -128,26 +130,27 @@ impl<'a> PhysicalExprSchemaRewriter<'a> { match self.physical_file_schema.field_with_name(column.name()) { Ok(field) => field, Err(_) => { - // Column is missing from physical schema if !logical_field.is_nullable() { return exec_err!( "Non-nullable column '{}' is missing from the physical schema", column.name() ); } - // Fill in with null value + // If the column is missing from the physical schema fill it in with nulls as `SchemaAdapter` would do. + // TODO: do we need to sync this with what the `SchemaAdapter` actually does? + // While the default implementation fills in nulls in theory a custom `SchemaAdapter` could do something else! let null_value = ScalarValue::Null.cast_to(logical_field.data_type())?; return Ok(Transformed::yes(expressions::lit(null_value))); } }; - // Check if casting is needed + // If the logical field and physical field are different, we need to cast + // the column to the logical field's data type. + // We will try later to move the cast to literal values if possible, which is computationally cheaper. if logical_field.data_type() == physical_field.data_type() { return Ok(Transformed::no(expr)); } - - // Perform type casting if !can_cast_types(physical_field.data_type(), logical_field.data_type()) { return exec_err!( "Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type)", From d1e6800af8552b096473614ec564dfe67674e098 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 21 Jun 2025 11:10:39 -0500 Subject: [PATCH 15/26] Revert "remove unecessary reassign" This reverts commit 4cef38fb7f8909f5492bd989af4a20f60ca7fa62. --- datafusion/datasource-parquet/src/opener.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 9ea917b56a0be..01900f853302a 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -37,6 +37,7 @@ use datafusion_common::pruning::{ }; use datafusion_common::{exec_err, Result}; use datafusion_datasource::PartitionedFile; +use datafusion_physical_expr::utils::reassign_predicate_columns; use datafusion_physical_expr::PhysicalExprSchemaRewriter; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::pruning::PruningPredicate; @@ -259,8 +260,11 @@ impl FileOpener for ParquetOpener { file.partition_values, ); - rewriter.rewrite(p).map_err(ArrowError::from) + rewriter.rewrite(p) + .map_err(ArrowError::from) + .map(|p| reassign_predicate_columns(p, &physical_file_schema, false)) }) + .transpose()? .transpose()?; // Build predicates for this specific file From 7f0de738c4af0cfe31fda977d75b4d80ca0ff90d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 21 Jun 2025 11:31:21 -0500 Subject: [PATCH 16/26] handle indexes internally --- datafusion/datasource-parquet/src/opener.rs | 10 +++---- .../physical-expr/src/schema_rewriter.rs | 27 +++++++++++++------ 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 01900f853302a..0e8e19b692941 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -37,7 +37,6 @@ use datafusion_common::pruning::{ }; use datafusion_common::{exec_err, Result}; use datafusion_datasource::PartitionedFile; -use datafusion_physical_expr::utils::reassign_predicate_columns; use datafusion_physical_expr::PhysicalExprSchemaRewriter; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::pruning::PruningPredicate; @@ -251,20 +250,17 @@ impl FileOpener for ParquetOpener { let predicate = predicate .map(|p| { - let rewriter = PhysicalExprSchemaRewriter::new( + PhysicalExprSchemaRewriter::new( &physical_file_schema, &logical_file_schema, ) .with_partition_columns( partition_fields.to_vec(), file.partition_values, - ); - - rewriter.rewrite(p) + ) + .rewrite(p) .map_err(ArrowError::from) - .map(|p| reassign_predicate_columns(p, &physical_file_schema, false)) }) - .transpose()? .transpose()?; // Build predicates for this specific file diff --git a/datafusion/physical-expr/src/schema_rewriter.rs b/datafusion/physical-expr/src/schema_rewriter.rs index ba058587680e6..9a7949402f863 100644 --- a/datafusion/physical-expr/src/schema_rewriter.rs +++ b/datafusion/physical-expr/src/schema_rewriter.rs @@ -98,7 +98,7 @@ impl<'a> PhysicalExprSchemaRewriter<'a> { expr: Arc, ) -> Result>> { if let Some(column) = expr.as_any().downcast_ref::() { - return self.rewrite_column(Arc::clone(&expr), column); + return self.rewrite_column(Arc::clone(&expr), column) } Ok(Transformed::no(expr)) @@ -126,9 +126,8 @@ impl<'a> PhysicalExprSchemaRewriter<'a> { }; // Check if the column exists in the physical schema - let physical_field = - match self.physical_file_schema.field_with_name(column.name()) { - Ok(field) => field, + let physical_column_index = match self.physical_file_schema.index_of(column.name()) { + Ok(index) => index, Err(_) => { if !logical_field.is_nullable() { return exec_err!( @@ -144,13 +143,25 @@ impl<'a> PhysicalExprSchemaRewriter<'a> { return Ok(Transformed::yes(expressions::lit(null_value))); } }; + let physical_field = self.physical_file_schema.field(physical_column_index); + + let column = match (column.index() == physical_column_index, logical_field.data_type() == physical_field.data_type()) { + // If the column index matches and the data types match, we can use the column as is + (true, true) => return Ok(Transformed::no(expr)), + // If the indexes or data types do not match, we need to create a new column expression + (true, _) => column.clone(), + (false, _) => Column::new_with_schema(logical_field.name(), self.logical_file_schema)? + }; - // If the logical field and physical field are different, we need to cast - // the column to the logical field's data type. - // We will try later to move the cast to literal values if possible, which is computationally cheaper. if logical_field.data_type() == physical_field.data_type() { - return Ok(Transformed::no(expr)); + // If the data types match, we can use the column as is + return Ok(Transformed::yes(Arc::new(column))); } + + // We need to cast the column to the logical data type + // TODO: add optimization to move the cast from the column to literal expressions in the case of `col = 123` + // since that's much cheaper to evalaute. + // See https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928 if !can_cast_types(physical_field.data_type(), logical_field.data_type()) { return exec_err!( "Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type)", From f141fa96e89acd59d00d8761c716f0342a10459b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 21 Jun 2025 11:34:01 -0500 Subject: [PATCH 17/26] reafactor --- datafusion/physical-expr/src/schema_rewriter.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-expr/src/schema_rewriter.rs b/datafusion/physical-expr/src/schema_rewriter.rs index 9a7949402f863..8924a76860bb8 100644 --- a/datafusion/physical-expr/src/schema_rewriter.rs +++ b/datafusion/physical-expr/src/schema_rewriter.rs @@ -98,7 +98,7 @@ impl<'a> PhysicalExprSchemaRewriter<'a> { expr: Arc, ) -> Result>> { if let Some(column) = expr.as_any().downcast_ref::() { - return self.rewrite_column(Arc::clone(&expr), column) + return self.rewrite_column(Arc::clone(&expr), column); } Ok(Transformed::no(expr)) @@ -126,7 +126,8 @@ impl<'a> PhysicalExprSchemaRewriter<'a> { }; // Check if the column exists in the physical schema - let physical_column_index = match self.physical_file_schema.index_of(column.name()) { + let physical_column_index = + match self.physical_file_schema.index_of(column.name()) { Ok(index) => index, Err(_) => { if !logical_field.is_nullable() { @@ -144,13 +145,18 @@ impl<'a> PhysicalExprSchemaRewriter<'a> { } }; let physical_field = self.physical_file_schema.field(physical_column_index); - - let column = match (column.index() == physical_column_index, logical_field.data_type() == physical_field.data_type()) { + + let column = match ( + column.index() == physical_column_index, + logical_field.data_type() == physical_field.data_type(), + ) { // If the column index matches and the data types match, we can use the column as is (true, true) => return Ok(Transformed::no(expr)), // If the indexes or data types do not match, we need to create a new column expression (true, _) => column.clone(), - (false, _) => Column::new_with_schema(logical_field.name(), self.logical_file_schema)? + (false, _) => { + Column::new_with_schema(logical_field.name(), self.logical_file_schema)? + } }; if logical_field.data_type() == physical_field.data_type() { From c97fa32b83ea16d96d2e014c89b27935376679d1 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 21 Jun 2025 11:43:32 -0500 Subject: [PATCH 18/26] fix --- datafusion/physical-expr/src/schema_rewriter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/schema_rewriter.rs b/datafusion/physical-expr/src/schema_rewriter.rs index 8924a76860bb8..bccc85b1e5c8e 100644 --- a/datafusion/physical-expr/src/schema_rewriter.rs +++ b/datafusion/physical-expr/src/schema_rewriter.rs @@ -155,7 +155,7 @@ impl<'a> PhysicalExprSchemaRewriter<'a> { // If the indexes or data types do not match, we need to create a new column expression (true, _) => column.clone(), (false, _) => { - Column::new_with_schema(logical_field.name(), self.logical_file_schema)? + Column::new_with_schema(logical_field.name(), self.physical_file_schema)? } }; From 4dcb3d2762a21bac37dbb11fd33af59d40678411 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 22 Jun 2025 00:45:12 -0500 Subject: [PATCH 19/26] fix --- datafusion/physical-expr/src/schema_rewriter.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/schema_rewriter.rs b/datafusion/physical-expr/src/schema_rewriter.rs index bccc85b1e5c8e..53af908624350 100644 --- a/datafusion/physical-expr/src/schema_rewriter.rs +++ b/datafusion/physical-expr/src/schema_rewriter.rs @@ -177,8 +177,11 @@ impl<'a> PhysicalExprSchemaRewriter<'a> { ); } - let cast_expr = - Arc::new(CastExpr::new(expr, logical_field.data_type().clone(), None)); + let cast_expr = Arc::new(CastExpr::new( + Arc::new(column), + logical_field.data_type().clone(), + None, + )); Ok(Transformed::yes(cast_expr)) } @@ -280,9 +283,9 @@ mod tests { let (physical_schema, logical_schema) = create_test_schema(); let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema); - let column_expr = Arc::new(Column::new("b", 1)); + let column_expr = Arc::new(Column::new("b", 1)) as Arc; - let result = rewriter.rewrite(column_expr.clone())?; + let result = rewriter.rewrite(Arc::clone(&column_expr))?; // Should be the same expression (no transformation needed) // We compare the underlying pointer through the trait object From ce47bd8a010bdd4cdbbd40324e06b0b1e4406eb3 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 23 Jun 2025 22:03:18 -0500 Subject: [PATCH 20/26] add comment --- datafusion/datasource-parquet/src/opener.rs | 2 + .../physical-expr/src/schema_rewriter.rs | 60 ++++++++++++++++++- 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 0e8e19b692941..69ea7a4b7896a 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -248,6 +248,8 @@ impl FileOpener for ParquetOpener { } } + // Adapt the predicate to the physical file schema. + // This evaluates missing columns and inserts any necessary casts. let predicate = predicate .map(|p| { PhysicalExprSchemaRewriter::new( diff --git a/datafusion/physical-expr/src/schema_rewriter.rs b/datafusion/physical-expr/src/schema_rewriter.rs index 53af908624350..8c7a3b2bca477 100644 --- a/datafusion/physical-expr/src/schema_rewriter.rs +++ b/datafusion/physical-expr/src/schema_rewriter.rs @@ -139,6 +139,7 @@ impl<'a> PhysicalExprSchemaRewriter<'a> { // If the column is missing from the physical schema fill it in with nulls as `SchemaAdapter` would do. // TODO: do we need to sync this with what the `SchemaAdapter` actually does? // While the default implementation fills in nulls in theory a custom `SchemaAdapter` could do something else! + // See https://github.com/apache/datafusion/issues/16527 let null_value = ScalarValue::Null.cast_to(logical_field.data_type())?; return Ok(Transformed::yes(expressions::lit(null_value))); @@ -197,9 +198,12 @@ impl<'a> PhysicalExprSchemaRewriter<'a> { #[cfg(test)] mod tests { + use crate::expressions::lit; + use super::*; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::ScalarValue; + use datafusion_expr::Operator; use std::sync::Arc; fn create_test_schema() -> (Schema, Schema) { @@ -218,18 +222,68 @@ mod tests { } #[test] - fn test_rewrite_column_with_type_cast() -> Result<()> { + fn test_rewrite_column_with_type_cast() { let (physical_schema, logical_schema) = create_test_schema(); let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema); let column_expr = Arc::new(Column::new("a", 0)); - let result = rewriter.rewrite(column_expr)?; + let result = rewriter.rewrite(column_expr).unwrap(); // Should be wrapped in a cast expression assert!(result.as_any().downcast_ref::().is_some()); + } - Ok(()) + #[test] + fn test_rewrite_mulit_column_expr_with_type_cast() { + let (physical_schema, logical_schema) = create_test_schema(); + let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema); + + // Create a complex expression: (a + 5) OR (c > 0.0) that tests the recursive case of the rewriter + let column_a = Arc::new(Column::new("a", 0)) as Arc; + let column_c = Arc::new(Column::new("c", 2)) as Arc; + let expr = expressions::BinaryExpr::new( + Arc::clone(&column_a), + Operator::Plus, + Arc::new(expressions::Literal::new(ScalarValue::Int64(Some(5)))), + ); + let expr = expressions::BinaryExpr::new( + Arc::new(expr), + Operator::Or, + Arc::new(expressions::BinaryExpr::new( + Arc::clone(&column_c), + Operator::Gt, + Arc::new(expressions::Literal::new(ScalarValue::Float64(Some(0.0)))), + )), + ); + + let result = rewriter.rewrite(Arc::new(expr)).unwrap(); + println!("Rewritten expression: {}", result); + + let expected = expressions::BinaryExpr::new( + Arc::new(CastExpr::new( + Arc::new(Column::new("a", 0)), + DataType::Int64, + None, + )), + Operator::Plus, + Arc::new(expressions::Literal::new(ScalarValue::Int64(Some(5)))), + ); + let expected = Arc::new(expressions::BinaryExpr::new( + Arc::new(expected), + Operator::Or, + Arc::new(expressions::BinaryExpr::new( + lit(ScalarValue::Null), + Operator::Gt, + Arc::new(expressions::Literal::new(ScalarValue::Float64(Some(0.0)))), + )), + )) as Arc; + + assert_eq!( + result.to_string(), + expected.to_string(), + "The rewritten expression did not match the expected output" + ); } #[test] From dc1005d701608541d3cded27f45312e2353180f3 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 23 Jun 2025 22:27:08 -0500 Subject: [PATCH 21/26] clippy --- datafusion/physical-expr/src/schema_rewriter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/schema_rewriter.rs b/datafusion/physical-expr/src/schema_rewriter.rs index 8c7a3b2bca477..e66b41171e617 100644 --- a/datafusion/physical-expr/src/schema_rewriter.rs +++ b/datafusion/physical-expr/src/schema_rewriter.rs @@ -258,7 +258,7 @@ mod tests { ); let result = rewriter.rewrite(Arc::new(expr)).unwrap(); - println!("Rewritten expression: {}", result); + println!("Rewritten expression: {result}"); let expected = expressions::BinaryExpr::new( Arc::new(CastExpr::new( From 866c444fc78f76e85ca5ff4abbfd7346300d00a7 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 24 Jun 2025 09:16:07 -0500 Subject: [PATCH 22/26] add example for RecordBatch adaptation --- .../physical-expr/src/schema_rewriter.rs | 101 +++++++++++++++++- 1 file changed, 98 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr/src/schema_rewriter.rs b/datafusion/physical-expr/src/schema_rewriter.rs index e66b41171e617..d34fe6998abd8 100644 --- a/datafusion/physical-expr/src/schema_rewriter.rs +++ b/datafusion/physical-expr/src/schema_rewriter.rs @@ -198,12 +198,16 @@ impl<'a> PhysicalExprSchemaRewriter<'a> { #[cfg(test)] mod tests { - use crate::expressions::lit; + use crate::expressions::{col, lit}; use super::*; - use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_common::ScalarValue; + use arrow::{ + array::{RecordBatch, RecordBatchOptions}, + datatypes::{DataType, Field, Schema, SchemaRef}, + }; + use datafusion_common::{record_batch, ScalarValue}; use datafusion_expr::Operator; + use itertools::Itertools; use std::sync::Arc; fn create_test_schema() -> (Schema, Schema) { @@ -369,4 +373,95 @@ mod tests { .to_string() .contains("Non-nullable column 'b' is missing")); } + + /// Stolen from ProjectionExec + fn batch_project( + expr: Vec>, + batch: &RecordBatch, + schema: SchemaRef, + ) -> Result { + // Records time on drop + let arrays = expr + .iter() + .map(|expr| { + expr.evaluate(batch) + .and_then(|v| v.into_array(batch.num_rows())) + }) + .collect::>>()?; + + if arrays.is_empty() { + let options = + RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); + RecordBatch::try_new_with_options(Arc::clone(&schema), arrays, &options) + .map_err(Into::into) + } else { + RecordBatch::try_new(Arc::clone(&schema), arrays).map_err(Into::into) + } + } + + /// Example showing how we can use the `PhysicalExprSchemaRewriter` to adapt RecordBatches during a scan + /// to apply projections, type conversions and handling of missing columns all at once. + #[test] + fn test_adapt_batches() { + let physical_batch = record_batch!( + ("a", Int32, vec![Some(1), None, Some(3)]), + ("extra", Utf8, vec![Some("x"), Some("y"), None]) + ) + .unwrap(); + + let physical_schema = physical_batch.schema(); + + let logical_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, true), // Different type + Field::new("b", DataType::Utf8, true), // Missing from physical + ])); + + let projection = vec![ + col("b", &logical_schema).unwrap(), + col("a", &logical_schema).unwrap(), + ]; + + let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema); + + let adapted_projection = projection + .into_iter() + .map(|expr| rewriter.rewrite(expr).unwrap()) + .collect_vec(); + + let adapted_schema = Arc::new(Schema::new( + adapted_projection + .iter() + .map(|expr| expr.return_field(&physical_schema).unwrap()) + .collect_vec() + )); + + let res = batch_project( + adapted_projection, + &physical_batch, + Arc::clone(&adapted_schema), + ) + .unwrap(); + + assert_eq!(res.num_columns(), 2); + assert_eq!(res.column(0).data_type(), &DataType::Utf8); + assert_eq!(res.column(1).data_type(), &DataType::Int64); + assert_eq!( + res.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .collect_vec(), + vec![None, None, None] + ); + assert_eq!( + res.column(1) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .collect_vec(), + vec![Some(1), None, Some(3)] + ); + } } From 9c71b6dbf067c3a85dde1dd816757ff42e755f4a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 24 Jun 2025 09:18:11 -0500 Subject: [PATCH 23/26] fmt --- datafusion/physical-expr/src/schema_rewriter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/schema_rewriter.rs b/datafusion/physical-expr/src/schema_rewriter.rs index d34fe6998abd8..e12630915df55 100644 --- a/datafusion/physical-expr/src/schema_rewriter.rs +++ b/datafusion/physical-expr/src/schema_rewriter.rs @@ -413,7 +413,7 @@ mod tests { let logical_schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int64, true), // Different type - Field::new("b", DataType::Utf8, true), // Missing from physical + Field::new("b", DataType::Utf8, true), // Missing from physical ])); let projection = vec![ @@ -432,7 +432,7 @@ mod tests { adapted_projection .iter() .map(|expr| expr.return_field(&physical_schema).unwrap()) - .collect_vec() + .collect_vec(), )); let res = batch_project( From 20aa8354213708f47e33c35975e34a0803f96bfa Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 24 Jun 2025 09:33:20 -0500 Subject: [PATCH 24/26] comments --- datafusion/physical-expr/src/schema_rewriter.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/schema_rewriter.rs b/datafusion/physical-expr/src/schema_rewriter.rs index e12630915df55..b8759ea16d6e8 100644 --- a/datafusion/physical-expr/src/schema_rewriter.rs +++ b/datafusion/physical-expr/src/schema_rewriter.rs @@ -374,13 +374,12 @@ mod tests { .contains("Non-nullable column 'b' is missing")); } - /// Stolen from ProjectionExec + /// Roughly stolen from ProjectionExec fn batch_project( expr: Vec>, batch: &RecordBatch, schema: SchemaRef, ) -> Result { - // Records time on drop let arrays = expr .iter() .map(|expr| { From 5e09d6839cfa524e5a3bb706f878c27e7cb9bf77 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 26 Jun 2025 08:32:42 -0500 Subject: [PATCH 25/26] add equivalent test --- .../src/datasource/physical_plan/parquet.rs | 62 ++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 8dee79ad61b23..58c7a3648ae60 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -39,7 +39,7 @@ mod tests { use crate::test::object_store::local_unpartitioned_file; use arrow::array::{ ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, StringArray, - StringViewArray, StructArray, + StringViewArray, StructArray, TimestampNanosecondArray, }; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder}; use arrow::record_batch::RecordBatch; @@ -960,6 +960,66 @@ mod tests { assert_eq!(read, 2, "Expected 2 rows to match the predicate"); } + #[tokio::test] + async fn evolved_schema_column_type_filter_timestamp_units() { + // The table and filter have a common data type + // The table schema is in milliseconds, but the file schema is in nanoseconds + let c1: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![ + Some(1_000_000_000), // 1970-01-01T00:00:01Z + Some(2_000_000_000), // 1970-01-01T00:00:02Z + Some(3_000_000_000), // 1970-01-01T00:00:03Z + Some(4_000_000_000), // 1970-01-01T00:00:04Z + ])); + let batch = create_batch(vec![("c1", c1.clone())]); + let table_schema = Arc::new(Schema::new(vec![Field::new( + "c1", + DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())), + false, + )])); + // One row should match, 2 pruned via page index, 1 pruned via filter pushdown + let filter = col("c1").eq(lit(ScalarValue::TimestampMillisecond(Some(1_000), Some("UTC".into())))); + let rt = RoundTrip::new() + .with_predicate(filter) + .with_pushdown_predicate() + .with_page_index_predicate() // produces pages with 2 rows each (2 pages total for our data) + .with_table_schema(table_schema.clone()) + .round_trip(vec![batch.clone()]) + .await; + // There should be no predicate evaluation errors and we keep 1 row + let metrics = rt.parquet_exec.metrics().unwrap(); + assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0); + let read = rt + .batches + .unwrap() + .iter() + .map(|b| b.num_rows()) + .sum::(); + assert_eq!(read, 1, "Expected 1 rows to match the predicate"); + assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0); + assert_eq!(get_value(&metrics, "page_index_rows_pruned"), 2); + assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 1); + // If we filter with a value that is completely out of the range of the data + // we prune at the row group level. + let filter = col("c1").eq(lit(ScalarValue::TimestampMillisecond(Some(5_000), Some("UTC".into())))); + let rt = RoundTrip::new() + .with_predicate(filter) + .with_pushdown_predicate() + .with_table_schema(table_schema) + .round_trip(vec![batch]) + .await; + // There should be no predicate evaluation errors and we keep 0 rows + let metrics = rt.parquet_exec.metrics().unwrap(); + assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0); + let read = rt + .batches + .unwrap() + .iter() + .map(|b| b.num_rows()) + .sum::(); + assert_eq!(read, 0, "Expected 0 rows to match the predicate"); + assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 1); + } + #[tokio::test] async fn evolved_schema_disjoint_schema_filter() { let c1: ArrayRef = From fa1f621afe5be1108b12e7df3ad8556b79c6a00d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 26 Jun 2025 08:34:54 -0500 Subject: [PATCH 26/26] fmt --- .../core/src/datasource/physical_plan/parquet.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 58c7a3648ae60..55db0d854204d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -977,7 +977,10 @@ mod tests { false, )])); // One row should match, 2 pruned via page index, 1 pruned via filter pushdown - let filter = col("c1").eq(lit(ScalarValue::TimestampMillisecond(Some(1_000), Some("UTC".into())))); + let filter = col("c1").eq(lit(ScalarValue::TimestampMillisecond( + Some(1_000), + Some("UTC".into()), + ))); let rt = RoundTrip::new() .with_predicate(filter) .with_pushdown_predicate() @@ -1000,7 +1003,10 @@ mod tests { assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 1); // If we filter with a value that is completely out of the range of the data // we prune at the row group level. - let filter = col("c1").eq(lit(ScalarValue::TimestampMillisecond(Some(5_000), Some("UTC".into())))); + let filter = col("c1").eq(lit(ScalarValue::TimestampMillisecond( + Some(5_000), + Some("UTC".into()), + ))); let rt = RoundTrip::new() .with_predicate(filter) .with_pushdown_predicate()