Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1676,6 +1676,34 @@ mod tests {
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
}

#[tokio::test]
async fn multi_column_predicate_pushdown() {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));

let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));

let batch1 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]);

// Columns in different order to schema
let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar")));

// read/write them files:
let read = round_trip_to_parquet(vec![batch1], None, None, Some(filter), true)
.await
.unwrap();

let expected = vec![
"+-----+----+",
"| c1 | c2 |",
"+-----+----+",
"| Foo | 1 |",
"| bar | |",
"+-----+----+",
];
assert_batches_sorted_eq!(expected, &read);
}

#[tokio::test]
async fn evolved_schema_incompatible_types() {
let c1: ArrayRef =
Expand Down
25 changes: 22 additions & 3 deletions datafusion/core/src/physical_plan/file_format/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ use crate::physical_plan::metrics;
#[derive(Debug)]
pub(crate) struct DatafusionArrowPredicate {
physical_expr: Arc<dyn PhysicalExpr>,
projection: ProjectionMask,
projection_mask: ProjectionMask,
projection: Vec<usize>,
/// how many rows were filtered out by this predicate
rows_filtered: metrics::Count,
/// how long was spent evaluating this predicate
Expand All @@ -90,9 +91,22 @@ impl DatafusionArrowPredicate {
let physical_expr =
create_physical_expr(&candidate.expr, &df_schema, &schema, &props)?;

// ArrowPredicate::evaluate is passed columns in the order they appear in the file
// If the predicate has multiple columns, we therefore must project the columns based
// on the order they appear in the file
let projection = match candidate.projection.len() {
0 | 1 => vec![],
len => {
let mut projection: Vec<_> = (0..len).collect();
projection.sort_unstable_by_key(|x| candidate.projection[*x]);
projection
}
};

Ok(Self {
physical_expr,
projection: ProjectionMask::roots(
projection,
projection_mask: ProjectionMask::roots(
metadata.file_metadata().schema_descr(),
candidate.projection,
),
Expand All @@ -104,10 +118,15 @@ impl DatafusionArrowPredicate {

impl ArrowPredicate for DatafusionArrowPredicate {
fn projection(&self) -> &ProjectionMask {
&self.projection
&self.projection_mask
}

fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
let batch = match self.projection.is_empty() {
true => batch,
false => batch.project(&self.projection)?,
};

// scoped timer updates on drop
let mut timer = self.time.timer();
match self
Expand Down