From 708ef2fecad79402465adbc94f914c48bd214696 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Sun, 13 Oct 2024 10:36:52 +0200 Subject: [PATCH 1/2] Report errors in partition filters This patch fixes 2 bugs. Errors in partition filters are ignored and that we allow partitions filters be push down for unpartition tables but we never evaluate such filters. The first bug is fixed by reporting errors for partition filters and only evaluating the filters we allowed as partition filters in `supports_filters_pushdown`. The second bug is fixed by only allowing partition filters to be pushed down when we have partition columns. --- datafusion/core/src/dataframe/mod.rs | 4 +- .../core/src/datasource/listing/helpers.rs | 36 +++++----- .../core/src/datasource/listing/table.rs | 69 ++++++++++--------- .../sqllogictest/test_files/arrow_files.slt | 5 ++ datafusion/sqllogictest/test_files/errors.slt | 4 ++ 5 files changed, 65 insertions(+), 53 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 67e2a4780d06c..8a0829cd5e4b2 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2987,9 +2987,7 @@ mod tests { JoinType::Inner, Some(Expr::Literal(ScalarValue::Null)), )?; - let expected_plan = "CrossJoin:\ - \n TableScan: a projection=[c1], full_filters=[Boolean(NULL)]\ - \n TableScan: b projection=[c1]"; + let expected_plan = "EmptyRelation"; assert_eq!(expected_plan, format!("{}", join.into_optimized_plan()?)); // JOIN ON expression must be boolean type diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 72d7277d6ae26..47012f777ad1e 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use super::ListingTableUrl; use super::PartitionedFile; use crate::execution::context::SessionState; +use datafusion_common::internal_err; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{BinaryExpr, Operator}; @@ -285,25 +286,20 @@ async fn prune_partitions( let props = ExecutionProps::new(); // Applies `filter` to `batch` returning `None` on error - let do_filter = |filter| -> Option { - let expr = create_physical_expr(filter, &df_schema, &props).ok()?; - expr.evaluate(&batch) - .ok()? - .into_array(partitions.len()) - .ok() + let do_filter = |filter| -> Result { + let expr = create_physical_expr(filter, &df_schema, &props)?; + expr.evaluate(&batch)?.into_array(partitions.len()) }; - //.Compute the conjunction of the filters, ignoring errors + //.Compute the conjunction of the filters let mask = filters .iter() - .fold(None, |acc, filter| match (acc, do_filter(filter)) { - (Some(a), Some(b)) => Some(and(&a, b.as_boolean()).unwrap_or(a)), - (None, Some(r)) => Some(r.as_boolean().clone()), - (r, None) => r, - }); + .map(|f| do_filter(f).map(|a| a.as_boolean().clone())) + .reduce(|a, b| Ok(and(&a?, &b?)?)); let mask = match mask { - Some(mask) => mask, + Some(Ok(mask)) => mask, + Some(Err(err)) => return Err(err), None => return Ok(partitions), }; @@ -401,8 +397,8 @@ fn evaluate_partition_prefix<'a>( /// Discover the partitions on the given path and prune out files /// that belong to irrelevant partitions using `filters` expressions. -/// `filters` might contain expressions that can be resolved only at the -/// file level (e.g. Parquet row group pruning). +/// `filters` should only contain expressions that can be evaluated +/// using only the partition columns. pub async fn pruned_partition_list<'a>( ctx: &'a SessionState, store: &'a dyn ObjectStore, @@ -413,6 +409,12 @@ pub async fn pruned_partition_list<'a>( ) -> Result>> { // if no partition col => simply list all the files if partition_cols.is_empty() { + if !filters.is_empty() { + return internal_err!( + "Got partition filters for unpartitioned table {}", + table_path + ); + } return Ok(Box::pin( table_path .list_all_files(ctx, store, file_extension) @@ -631,13 +633,11 @@ mod tests { ]); let filter1 = Expr::eq(col("part1"), lit("p1v2")); let filter2 = Expr::eq(col("part2"), lit("p2v1")); - // filter3 cannot be resolved at partition pruning - let filter3 = Expr::eq(col("part2"), col("other")); let pruned = pruned_partition_list( &state, store.as_ref(), &ListingTableUrl::parse("file:///tablepath/").unwrap(), - &[filter1, filter2, filter3], + &[filter1, filter2], ".parquet", &[ (String::from("part1"), DataType::Utf8), diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index a9c6aec175371..1e9f06c20b476 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -782,6 +782,16 @@ impl ListingTable { } } +// Expressions can be used for parttion pruning if they can be evaluated using +// only the partiton columns and there are partition columns. +fn can_be_evaluted_for_partition_pruning( + partition_column_names: &[&str], + expr: &Expr, +) -> bool { + !partition_column_names.is_empty() + && expr_applicable_for_cols(partition_column_names, expr) +} + #[async_trait] impl TableProvider for ListingTable { fn as_any(&self) -> &dyn Any { @@ -807,10 +817,28 @@ impl TableProvider for ListingTable { filters: &[Expr], limit: Option, ) -> Result> { + // extract types of partition columns + let table_partition_cols = self + .options + .table_partition_cols + .iter() + .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone())) + .collect::>>()?; + + let table_partition_col_names = table_partition_cols + .iter() + .map(|field| field.name().as_str()) + .collect::>(); + // If the filters can be resolved using only partition cols, there is no need to + // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated + let (partition_filters, filters): (Vec<_>, Vec<_>) = + filters.iter().cloned().partition(|filter| { + can_be_evaluted_for_partition_pruning(&table_partition_col_names, filter) + }); // TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here? let session_state = state.as_any().downcast_ref::().unwrap(); let (mut partitioned_file_lists, statistics) = self - .list_files_for_scan(session_state, filters, limit) + .list_files_for_scan(session_state, &partition_filters, limit) .await?; // if no files need to be read, return an `EmptyExec` @@ -846,28 +874,6 @@ impl TableProvider for ListingTable { None => {} // no ordering required }; - // extract types of partition columns - let table_partition_cols = self - .options - .table_partition_cols - .iter() - .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone())) - .collect::>>()?; - - // If the filters can be resolved using only partition cols, there is no need to - // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated - let table_partition_col_names = table_partition_cols - .iter() - .map(|field| field.name().as_str()) - .collect::>(); - let filters = filters - .iter() - .filter(|filter| { - !expr_applicable_for_cols(&table_partition_col_names, filter) - }) - .cloned() - .collect::>(); - let filters = conjunction(filters.to_vec()) .map(|expr| -> Result<_> { // NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns. @@ -908,18 +914,17 @@ impl TableProvider for ListingTable { &self, filters: &[&Expr], ) -> Result> { + let partition_column_names = self + .options + .table_partition_cols + .iter() + .map(|col| col.0.as_str()) + .collect::>(); filters .iter() .map(|filter| { - if expr_applicable_for_cols( - &self - .options - .table_partition_cols - .iter() - .map(|col| col.0.as_str()) - .collect::>(), - filter, - ) { + if can_be_evaluted_for_partition_pruning(&partition_column_names, filter) + { // if filter can be handled by partition pruning, it is exact return Ok(TableProviderFilterPushDown::Exact); } diff --git a/datafusion/sqllogictest/test_files/arrow_files.slt b/datafusion/sqllogictest/test_files/arrow_files.slt index e66ba7477fc48..e73acc384cb3e 100644 --- a/datafusion/sqllogictest/test_files/arrow_files.slt +++ b/datafusion/sqllogictest/test_files/arrow_files.slt @@ -118,3 +118,8 @@ EXPLAIN SELECT f0 FROM arrow_partitioned WHERE part = 456 ---- logical_plan TableScan: arrow_partitioned projection=[f0], full_filters=[arrow_partitioned.part = Int32(456)] physical_plan ArrowExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_arrow/part=456/data.arrow]]}, projection=[f0] + + +# Errors in partition filters should be reported +query error Divide by zero error +SELECT f0 FROM arrow_partitioned WHERE CASE WHEN true THEN 1 / 0 ELSE part END = 1; diff --git a/datafusion/sqllogictest/test_files/errors.slt b/datafusion/sqllogictest/test_files/errors.slt index ce09475253445..76030abe30c9b 100644 --- a/datafusion/sqllogictest/test_files/errors.slt +++ b/datafusion/sqllogictest/test_files/errors.slt @@ -133,3 +133,7 @@ create table foo as values (1), ('foo'); query error No function matches select 1 group by substr(''); + +# Bug see +query error Divide by zero +SELECT c2 from aggregate_test_100 where CASE WHEN true THEN 1 / 0 ELSE 0 END = 1; From e6c773302084b6de4512d41e0082a7e0f0f8caef Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Sun, 13 Oct 2024 18:09:54 +0200 Subject: [PATCH 2/2] Update datafusion/sqllogictest/test_files/errors.slt --- datafusion/sqllogictest/test_files/errors.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/errors.slt b/datafusion/sqllogictest/test_files/errors.slt index 76030abe30c9b..da46a7e5e6796 100644 --- a/datafusion/sqllogictest/test_files/errors.slt +++ b/datafusion/sqllogictest/test_files/errors.slt @@ -134,6 +134,6 @@ create table foo as values (1), ('foo'); query error No function matches select 1 group by substr(''); -# Bug see +# Error in filter should be reported query error Divide by zero SELECT c2 from aggregate_test_100 where CASE WHEN true THEN 1 / 0 ELSE 0 END = 1;