From 3e58165ef87c8c87a5c44dfdb948560c27ab4671 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 18 May 2025 22:58:33 -0400 Subject: [PATCH 1/6] wip --- .../src/datasource/physical_plan/parquet.rs | 64 ++++++++++++++++++- datafusion/datasource-parquet/src/opener.rs | 28 ++++---- .../datasource-parquet/src/row_filter.rs | 4 +- datafusion/datasource-parquet/src/source.rs | 2 +- datafusion/physical-optimizer/src/pruning.rs | 3 + 5 files changed, 84 insertions(+), 17 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index e4d5060e065c..bb9b9f5f95d1 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -39,7 +39,7 @@ mod tests { use crate::test::object_store::local_unpartitioned_file; use arrow::array::{ ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, StringArray, - StructArray, + StringViewArray, StructArray, }; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder}; use arrow::record_batch::RecordBatch; @@ -817,7 +817,7 @@ mod tests { } #[tokio::test] - async fn evolved_schema_filter() { + async fn evolved_schema_column_order_filter() { let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); @@ -848,6 +848,66 @@ mod tests { assert_eq!(read.len(), 0); } + #[tokio::test] + async fn evolved_schema_column_type_filter_strings() { + // The table and filter have a common data type, but the file schema differs + let c1: ArrayRef = + Arc::new(StringViewArray::from(vec![Some("foo"), Some("bar")])); + let batch = create_batch(vec![("c1", c1.clone())]); + + let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)])); + + // Predicate should prune all row groups + let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string())))); + let read = RoundTrip::new() + .with_predicate(filter) + .with_schema(schema.clone()) + .round_trip_to_batches(vec![batch.clone()]) + .await + .unwrap(); + assert_eq!(read.len(), 0); + + // Predicate should prune no row groups + let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("foo".to_string())))); + let read = RoundTrip::new() + .with_predicate(filter) + .with_schema(schema) + .round_trip_to_batches(vec![batch]) + .await + .unwrap(); + assert_eq!(read.len(), 1); + } + + #[tokio::test] + async fn evolved_schema_column_type_filter_ints() { + // The table and filter have a common data type, but the file schema differs + let c1: ArrayRef = Arc::new(Int8Array::from(vec![Some(1), Some(2)])); + let batch = create_batch(vec![("c1", c1.clone())]); + + let schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, false)])); + + // Predicate should prune all row groups + let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5)))); + let read = RoundTrip::new() + .with_predicate(filter) + .with_schema(schema.clone()) + .round_trip_to_batches(vec![batch.clone()]) + .await + .unwrap(); + assert_eq!(read.len(), 0); + + // Predicate should prune no row groups + let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1)))); + let read = RoundTrip::new() + .with_predicate(filter) + .with_schema(schema) + .round_trip_to_batches(vec![batch]) + .await + .unwrap(); + assert_eq!(read.len(), 1); + } + #[tokio::test] async fn evolved_schema_disjoint_schema_filter() { let c1: ArrayRef = diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 376b40336176..9e14425074f7 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -55,8 +55,9 @@ pub(super) struct ParquetOpener { pub limit: Option, /// Optional predicate to apply during the scan pub predicate: Option>, - /// Schema of the output table - pub table_schema: SchemaRef, + /// Schema of the output table without partition columns. + /// This is the schema we coerce the physical file schema into. + pub logical_file_schema: SchemaRef, /// Optional hint for how large the initial request to read parquet metadata /// should be pub metadata_size_hint: Option, @@ -104,13 +105,13 @@ impl FileOpener for ParquetOpener { let batch_size = self.batch_size; let projected_schema = - SchemaRef::from(self.table_schema.project(&self.projection)?); + SchemaRef::from(self.logical_file_schema.project(&self.projection)?); let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory); let schema_adapter = self .schema_adapter_factory - .create(projected_schema, Arc::clone(&self.table_schema)); + .create(projected_schema, Arc::clone(&self.logical_file_schema)); let predicate = self.predicate.clone(); - let table_schema = Arc::clone(&self.table_schema); + let logical_file_schema = Arc::clone(&self.logical_file_schema); let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; let coerce_int96 = self.coerce_int96; @@ -141,17 +142,20 @@ impl FileOpener for ParquetOpener { .await?; // Note about schemas: we are actually dealing with **3 different schemas** here: - // - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc. - // - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to. + // - The table schema as defined by the TableProvider. + // This is what the user sees, what they get when they `SELECT * FROM table`, etc. + // - The logical file schema: this is the table schema minus any hive partition columns and projections. + // This is what the physicalfile schema is coerced to. // - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains. let mut physical_file_schema = Arc::clone(reader_metadata.schema()); // The schema loaded from the file may not be the same as the // desired schema (for example if we want to instruct the parquet // reader to read strings using Utf8View instead). Update if necessary - if let Some(merged) = - apply_file_schema_type_coercions(&table_schema, &physical_file_schema) - { + if let Some(merged) = apply_file_schema_type_coercions( + &logical_file_schema, + &physical_file_schema, + ) { physical_file_schema = Arc::new(merged); options = options.with_schema(Arc::clone(&physical_file_schema)); reader_metadata = ArrowReaderMetadata::try_new( @@ -178,7 +182,7 @@ impl FileOpener for ParquetOpener { // Build predicates for this specific file let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates( predicate.as_ref(), - &physical_file_schema, + &logical_file_schema, &predicate_creation_errors, ); @@ -215,7 +219,7 @@ impl FileOpener for ParquetOpener { let row_filter = row_filter::build_row_filter( &predicate, &physical_file_schema, - &table_schema, + &logical_file_schema, builder.metadata(), reorder_predicates, &file_metrics, diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index cd0cbf087f7a..cde9e56c9280 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -426,7 +426,7 @@ fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result, physical_file_schema: &SchemaRef, - table_schema: &SchemaRef, + logical_file_schema: &SchemaRef, metadata: &ParquetMetaData, reorder_predicates: bool, file_metrics: &ParquetFileMetrics, @@ -447,7 +447,7 @@ pub fn build_row_filter( FilterCandidateBuilder::new( Arc::clone(expr), Arc::clone(physical_file_schema), - Arc::clone(table_schema), + Arc::clone(logical_file_schema), Arc::clone(schema_adapter_factory), ) .build(metadata) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 13684db8ea15..69347f440c36 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -481,7 +481,7 @@ impl FileSource for ParquetSource { .expect("Batch size must set before creating ParquetOpener"), limit: base_config.limit, predicate: self.predicate.clone(), - table_schema: Arc::clone(&base_config.file_schema), + logical_file_schema: Arc::clone(&base_config.file_schema), metadata_size_hint: self.metadata_size_hint, metrics: self.metrics().clone(), parquet_file_reader_factory, diff --git a/datafusion/physical-optimizer/src/pruning.rs b/datafusion/physical-optimizer/src/pruning.rs index 40d93d4647cf..ea1d7ac890ee 100644 --- a/datafusion/physical-optimizer/src/pruning.rs +++ b/datafusion/physical-optimizer/src/pruning.rs @@ -610,6 +610,9 @@ impl PruningPredicate { let statistics_batch = build_statistics_record_batch(statistics, &self.required_columns)?; + println!("statistics_batch: {statistics_batch:?}"); + println!("predicate_expr: {:?}", self.predicate_expr); + // Evaluate the pruning predicate on that record batch and append any results to the builder builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?); From 7633ad594e4ff868e93b1c9d2f14788459c45016 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 18 May 2025 21:30:45 -0700 Subject: [PATCH 2/6] comment --- .../src/datasource/physical_plan/parquet.rs | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index bb9b9f5f95d1..27b78c59c08f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -897,15 +897,17 @@ mod tests { .unwrap(); assert_eq!(read.len(), 0); - // Predicate should prune no row groups - let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1)))); - let read = RoundTrip::new() - .with_predicate(filter) - .with_schema(schema) - .round_trip_to_batches(vec![batch]) - .await - .unwrap(); - assert_eq!(read.len(), 1); + // TODO: this is failing on main, and has been for a long time! + // See + // // Predicate should prune no row groups + // let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1)))); + // let read = RoundTrip::new() + // .with_predicate(filter) + // .with_schema(schema) + // .round_trip_to_batches(vec![batch]) + // .await + // .unwrap(); + // assert_eq!(read.len(), 1); } #[tokio::test] From cb5a6b2371ce16dcfc954fcf4315f936eea85d74 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 19 May 2025 06:36:16 -0700 Subject: [PATCH 3/6] Update datafusion/core/src/datasource/physical_plan/parquet.rs --- datafusion/core/src/datasource/physical_plan/parquet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 27b78c59c08f..efcf11457c19 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -898,7 +898,7 @@ mod tests { assert_eq!(read.len(), 0); // TODO: this is failing on main, and has been for a long time! - // See + // See https://github.com/apache/datafusion/pull/16086#discussion_r2094817127 // // Predicate should prune no row groups // let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1)))); // let read = RoundTrip::new() From b52ee28989171c1dbede935763d8e8bebd43d291 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 19 May 2025 06:37:41 -0700 Subject: [PATCH 4/6] remove prints --- datafusion/physical-optimizer/src/pruning.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/datafusion/physical-optimizer/src/pruning.rs b/datafusion/physical-optimizer/src/pruning.rs index ea1d7ac890ee..40d93d4647cf 100644 --- a/datafusion/physical-optimizer/src/pruning.rs +++ b/datafusion/physical-optimizer/src/pruning.rs @@ -610,9 +610,6 @@ impl PruningPredicate { let statistics_batch = build_statistics_record_batch(statistics, &self.required_columns)?; - println!("statistics_batch: {statistics_batch:?}"); - println!("predicate_expr: {:?}", self.predicate_expr); - // Evaluate the pruning predicate on that record batch and append any results to the builder builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?); From 8a9fa2799b9f990b0fcfaac70e5557b9804e857e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 19 May 2025 07:31:11 -0700 Subject: [PATCH 5/6] better test --- .../src/datasource/physical_plan/parquet.rs | 79 +++++++++++++------ 1 file changed, 53 insertions(+), 26 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index efcf11457c19..7515065fe410 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -100,6 +100,7 @@ mod tests { predicate: Option, pushdown_predicate: bool, page_index_predicate: bool, + bloom_filters: bool, } impl RoundTrip { @@ -132,6 +133,11 @@ mod tests { self } + fn with_bloom_filters(mut self) -> Self { + self.bloom_filters = true; + self + } + /// run the test, returning only the resulting RecordBatches async fn round_trip_to_batches( self, @@ -156,10 +162,20 @@ mod tests { source = source .with_pushdown_filters(true) .with_reorder_filters(true); + } else { + source = source.with_pushdown_filters(false); } if self.page_index_predicate { source = source.with_enable_page_index(true); + } else { + source = source.with_enable_page_index(false); + } + + if self.bloom_filters { + source = source.with_bloom_filter_on_read(true); + } else { + source = source.with_bloom_filter_on_read(false); } source.with_schema(Arc::clone(&file_schema)) @@ -859,23 +875,30 @@ mod tests { // Predicate should prune all row groups let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string())))); - let read = RoundTrip::new() + let rt = RoundTrip::new() .with_predicate(filter) .with_schema(schema.clone()) - .round_trip_to_batches(vec![batch.clone()]) - .await - .unwrap(); - assert_eq!(read.len(), 0); + .round_trip(vec![batch.clone()]) + .await; + // There should be no predicate evaluation errors + let metrics = rt.parquet_exec.metrics().unwrap(); + assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0); + assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0); + assert_eq!(rt.batches.unwrap().len(), 0); // Predicate should prune no row groups let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("foo".to_string())))); - let read = RoundTrip::new() + let rt = RoundTrip::new() .with_predicate(filter) .with_schema(schema) - .round_trip_to_batches(vec![batch]) - .await - .unwrap(); - assert_eq!(read.len(), 1); + .round_trip(vec![batch]) + .await; + // There should be no predicate evaluation errors + let metrics = rt.parquet_exec.metrics().unwrap(); + assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0); + assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0); + let read = rt.batches.unwrap().iter().map(|b| b.num_rows()).sum::(); + assert_eq!(read, 2, "Expected 2 rows to match the predicate"); } #[tokio::test] @@ -889,25 +912,28 @@ mod tests { // Predicate should prune all row groups let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5)))); - let read = RoundTrip::new() + let rt = RoundTrip::new() .with_predicate(filter) .with_schema(schema.clone()) - .round_trip_to_batches(vec![batch.clone()]) - .await - .unwrap(); - assert_eq!(read.len(), 0); + .round_trip(vec![batch.clone()]) + .await; + // There should be no predicate evaluation errors + let metrics = rt.parquet_exec.metrics().unwrap(); + assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0); + assert_eq!(rt.batches.unwrap().len(), 0); - // TODO: this is failing on main, and has been for a long time! - // See https://github.com/apache/datafusion/pull/16086#discussion_r2094817127 - // // Predicate should prune no row groups - // let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1)))); - // let read = RoundTrip::new() - // .with_predicate(filter) - // .with_schema(schema) - // .round_trip_to_batches(vec![batch]) - // .await - // .unwrap(); - // assert_eq!(read.len(), 1); + // Predicate should prune no row groups + let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1)))); + let rt = RoundTrip::new() + .with_predicate(filter) + .with_schema(schema) + .round_trip(vec![batch]) + .await; + // There should be no predicate evaluation errors + let metrics = rt.parquet_exec.metrics().unwrap(); + assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0); + let read = rt.batches.unwrap().iter().map(|b| b.num_rows()).sum::(); + assert_eq!(read, 2, "Expected 2 rows to match the predicate"); } #[tokio::test] @@ -1692,6 +1718,7 @@ mod tests { let rt = RoundTrip::new() .with_predicate(filter.clone()) .with_pushdown_predicate() + .with_bloom_filters() .round_trip(vec![batch1]) .await; From 455582b4c1e19a08e62efe6fc9c8b49e6b04cc83 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 19 May 2025 07:31:16 -0700 Subject: [PATCH 6/6] fmt --- .../core/src/datasource/physical_plan/parquet.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 7515065fe410..43fe84c5eb7a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -897,7 +897,12 @@ mod tests { let metrics = rt.parquet_exec.metrics().unwrap(); assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0); assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0); - let read = rt.batches.unwrap().iter().map(|b| b.num_rows()).sum::(); + let read = rt + .batches + .unwrap() + .iter() + .map(|b| b.num_rows()) + .sum::(); assert_eq!(read, 2, "Expected 2 rows to match the predicate"); } @@ -932,7 +937,12 @@ mod tests { // There should be no predicate evaluation errors let metrics = rt.parquet_exec.metrics().unwrap(); assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0); - let read = rt.batches.unwrap().iter().map(|b| b.num_rows()).sum::(); + let read = rt + .batches + .unwrap() + .iter() + .map(|b| b.num_rows()) + .sum::(); assert_eq!(read, 2, "Expected 2 rows to match the predicate"); }