From 34229e89b0c116acf87c8509175e73adc0a677e1 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 30 Oct 2022 08:56:07 +1300 Subject: [PATCH 1/4] Project columns within DatafusionArrowPredicate (#4005) (#4006) --- .../physical_plan/file_format/row_filter.rs | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs b/datafusion/core/src/physical_plan/file_format/row_filter.rs index dd9c8fb650fd1..b8a8487247e44 100644 --- a/datafusion/core/src/physical_plan/file_format/row_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs @@ -65,7 +65,8 @@ use std::sync::Arc; #[derive(Debug)] pub(crate) struct DatafusionArrowPredicate { physical_expr: Arc, - projection: ProjectionMask, + projection_mask: ProjectionMask, + projection: Vec, } impl DatafusionArrowPredicate { @@ -82,22 +83,40 @@ impl DatafusionArrowPredicate { let physical_expr = create_physical_expr(&candidate.expr, &df_schema, &schema, &props)?; + // ArrowPredicate::evaluate is passed columns in the order they appear in the file + // If the predicate has multiple columns, we therefore must project the columns based + // on the order they appear in the file + let projection = match candidate.projection.len() { + 0 | 1 => vec![], + len => { + let mut projection: Vec<_> = (0..len).collect(); + projection.sort_by_key(|x| candidate.projection[*x]); + projection + } + }; + Ok(Self { physical_expr, - projection: ProjectionMask::roots( + projection, + projection_mask: ProjectionMask::roots( metadata.file_metadata().schema_descr(), candidate.projection, - ), + ) }) } } impl ArrowPredicate for DatafusionArrowPredicate { fn projection(&self) -> &ProjectionMask { - &self.projection + &self.projection_mask } fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult { + let batch = match self.projection.is_empty() { + true => batch, + false => batch.project(&self.projection)? + }; + match self .physical_expr .evaluate(&batch) From d160859c01085787840b8143c379e7541db65624 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 30 Oct 2022 14:46:24 +1300 Subject: [PATCH 2/4] Add test --- .../src/physical_plan/file_format/parquet.rs | 29 +++++++++++++++++++ .../physical_plan/file_format/row_filter.rs | 2 +- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 0dda94322619a..ecb88ae6aa753 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1617,6 +1617,35 @@ mod tests { assert_batches_sorted_eq!(expected, &read); } + #[tokio::test] + async fn multi_column_predicate_pushdown() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let batch1 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]); + + // Columns in different order to schema + let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar"))); + + // read/write them files: + let read = + round_trip_to_parquet(vec![batch1], None, None, Some(filter), true) + .await + .unwrap(); + + let expected = vec![ + "+-----+----+", + "| c1 | c2 |", + "+-----+----+", + "| Foo | 1 |", + "| bar | |", + "+-----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + #[tokio::test] async fn evolved_schema_incompatible_types() { let c1: ArrayRef = diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs b/datafusion/core/src/physical_plan/file_format/row_filter.rs index b8a8487247e44..a8cc547edc8f4 100644 --- a/datafusion/core/src/physical_plan/file_format/row_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs @@ -90,7 +90,7 @@ impl DatafusionArrowPredicate { 0 | 1 => vec![], len => { let mut projection: Vec<_> = (0..len).collect(); - projection.sort_by_key(|x| candidate.projection[*x]); + projection.sort_unstable_by_key(|x| candidate.projection[*x]); projection } }; From 805b51a770b76059ee95f9f09f099e7093929d70 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 30 Oct 2022 14:49:41 +1300 Subject: [PATCH 3/4] Format --- datafusion/core/src/physical_plan/file_format/parquet.rs | 7 +++---- .../core/src/physical_plan/file_format/row_filter.rs | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index ecb88ae6aa753..7485ffce53fc6 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1630,10 +1630,9 @@ mod tests { let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar"))); // read/write them files: - let read = - round_trip_to_parquet(vec![batch1], None, None, Some(filter), true) - .await - .unwrap(); + let read = round_trip_to_parquet(vec![batch1], None, None, Some(filter), true) + .await + .unwrap(); let expected = vec![ "+-----+----+", diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs b/datafusion/core/src/physical_plan/file_format/row_filter.rs index a8cc547edc8f4..fefab09b36405 100644 --- a/datafusion/core/src/physical_plan/file_format/row_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs @@ -101,7 +101,7 @@ impl DatafusionArrowPredicate { projection_mask: ProjectionMask::roots( metadata.file_metadata().schema_descr(), candidate.projection, - ) + ), }) } } @@ -114,7 +114,7 @@ impl ArrowPredicate for DatafusionArrowPredicate { fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult { let batch = match self.projection.is_empty() { true => batch, - false => batch.project(&self.projection)? + false => batch.project(&self.projection)?, }; match self From 3f594c519781210e203c1b82f4fa5a5d6901c229 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 30 Oct 2022 07:46:33 -0400 Subject: [PATCH 4/4] Fix merge blunder --- datafusion/core/src/physical_plan/file_format/row_filter.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs b/datafusion/core/src/physical_plan/file_format/row_filter.rs index 2bb914436cf61..54bf4bb8fa42f 100644 --- a/datafusion/core/src/physical_plan/file_format/row_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs @@ -69,7 +69,6 @@ pub(crate) struct DatafusionArrowPredicate { physical_expr: Arc, projection_mask: ProjectionMask, projection: Vec, - projection: ProjectionMask, /// how many rows were filtered out by this predicate rows_filtered: metrics::Count, /// how long was spent evaluating this predicate