diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 67e2a4780d06c..8a0829cd5e4b2 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2987,9 +2987,7 @@ mod tests { JoinType::Inner, Some(Expr::Literal(ScalarValue::Null)), )?; - let expected_plan = "CrossJoin:\ - \n TableScan: a projection=[c1], full_filters=[Boolean(NULL)]\ - \n TableScan: b projection=[c1]"; + let expected_plan = "EmptyRelation"; assert_eq!(expected_plan, format!("{}", join.into_optimized_plan()?)); // JOIN ON expression must be boolean type diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 72d7277d6ae26..47012f777ad1e 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use super::ListingTableUrl; use super::PartitionedFile; use crate::execution::context::SessionState; +use datafusion_common::internal_err; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{BinaryExpr, Operator}; @@ -285,25 +286,20 @@ async fn prune_partitions( let props = ExecutionProps::new(); // Applies `filter` to `batch` returning `None` on error - let do_filter = |filter| -> Option { - let expr = create_physical_expr(filter, &df_schema, &props).ok()?; - expr.evaluate(&batch) - .ok()? - .into_array(partitions.len()) - .ok() + let do_filter = |filter| -> Result { + let expr = create_physical_expr(filter, &df_schema, &props)?; + expr.evaluate(&batch)?.into_array(partitions.len()) }; - //.Compute the conjunction of the filters, ignoring errors + //.Compute the conjunction of the filters let mask = filters .iter() - .fold(None, |acc, filter| match (acc, do_filter(filter)) { - (Some(a), Some(b)) => Some(and(&a, b.as_boolean()).unwrap_or(a)), - (None, Some(r)) => Some(r.as_boolean().clone()), - (r, None) => r, - }); + .map(|f| do_filter(f).map(|a| a.as_boolean().clone())) + .reduce(|a, b| Ok(and(&a?, &b?)?)); let mask = match mask { - Some(mask) => mask, + Some(Ok(mask)) => mask, + Some(Err(err)) => return Err(err), None => return Ok(partitions), }; @@ -401,8 +397,8 @@ fn evaluate_partition_prefix<'a>( /// Discover the partitions on the given path and prune out files /// that belong to irrelevant partitions using `filters` expressions. -/// `filters` might contain expressions that can be resolved only at the -/// file level (e.g. Parquet row group pruning). +/// `filters` should only contain expressions that can be evaluated +/// using only the partition columns. pub async fn pruned_partition_list<'a>( ctx: &'a SessionState, store: &'a dyn ObjectStore, @@ -413,6 +409,12 @@ pub async fn pruned_partition_list<'a>( ) -> Result>> { // if no partition col => simply list all the files if partition_cols.is_empty() { + if !filters.is_empty() { + return internal_err!( + "Got partition filters for unpartitioned table {}", + table_path + ); + } return Ok(Box::pin( table_path .list_all_files(ctx, store, file_extension) @@ -631,13 +633,11 @@ mod tests { ]); let filter1 = Expr::eq(col("part1"), lit("p1v2")); let filter2 = Expr::eq(col("part2"), lit("p2v1")); - // filter3 cannot be resolved at partition pruning - let filter3 = Expr::eq(col("part2"), col("other")); let pruned = pruned_partition_list( &state, store.as_ref(), &ListingTableUrl::parse("file:///tablepath/").unwrap(), - &[filter1, filter2, filter3], + &[filter1, filter2], ".parquet", &[ (String::from("part1"), DataType::Utf8), diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index a9c6aec175371..1e9f06c20b476 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -782,6 +782,16 @@ impl ListingTable { } } +// Expressions can be used for parttion pruning if they can be evaluated using +// only the partiton columns and there are partition columns. +fn can_be_evaluted_for_partition_pruning( + partition_column_names: &[&str], + expr: &Expr, +) -> bool { + !partition_column_names.is_empty() + && expr_applicable_for_cols(partition_column_names, expr) +} + #[async_trait] impl TableProvider for ListingTable { fn as_any(&self) -> &dyn Any { @@ -807,10 +817,28 @@ impl TableProvider for ListingTable { filters: &[Expr], limit: Option, ) -> Result> { + // extract types of partition columns + let table_partition_cols = self + .options + .table_partition_cols + .iter() + .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone())) + .collect::>>()?; + + let table_partition_col_names = table_partition_cols + .iter() + .map(|field| field.name().as_str()) + .collect::>(); + // If the filters can be resolved using only partition cols, there is no need to + // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated + let (partition_filters, filters): (Vec<_>, Vec<_>) = + filters.iter().cloned().partition(|filter| { + can_be_evaluted_for_partition_pruning(&table_partition_col_names, filter) + }); // TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here? let session_state = state.as_any().downcast_ref::().unwrap(); let (mut partitioned_file_lists, statistics) = self - .list_files_for_scan(session_state, filters, limit) + .list_files_for_scan(session_state, &partition_filters, limit) .await?; // if no files need to be read, return an `EmptyExec` @@ -846,28 +874,6 @@ impl TableProvider for ListingTable { None => {} // no ordering required }; - // extract types of partition columns - let table_partition_cols = self - .options - .table_partition_cols - .iter() - .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone())) - .collect::>>()?; - - // If the filters can be resolved using only partition cols, there is no need to - // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated - let table_partition_col_names = table_partition_cols - .iter() - .map(|field| field.name().as_str()) - .collect::>(); - let filters = filters - .iter() - .filter(|filter| { - !expr_applicable_for_cols(&table_partition_col_names, filter) - }) - .cloned() - .collect::>(); - let filters = conjunction(filters.to_vec()) .map(|expr| -> Result<_> { // NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns. @@ -908,18 +914,17 @@ impl TableProvider for ListingTable { &self, filters: &[&Expr], ) -> Result> { + let partition_column_names = self + .options + .table_partition_cols + .iter() + .map(|col| col.0.as_str()) + .collect::>(); filters .iter() .map(|filter| { - if expr_applicable_for_cols( - &self - .options - .table_partition_cols - .iter() - .map(|col| col.0.as_str()) - .collect::>(), - filter, - ) { + if can_be_evaluted_for_partition_pruning(&partition_column_names, filter) + { // if filter can be handled by partition pruning, it is exact return Ok(TableProviderFilterPushDown::Exact); } diff --git a/datafusion/sqllogictest/test_files/arrow_files.slt b/datafusion/sqllogictest/test_files/arrow_files.slt index e66ba7477fc48..e73acc384cb3e 100644 --- a/datafusion/sqllogictest/test_files/arrow_files.slt +++ b/datafusion/sqllogictest/test_files/arrow_files.slt @@ -118,3 +118,8 @@ EXPLAIN SELECT f0 FROM arrow_partitioned WHERE part = 456 ---- logical_plan TableScan: arrow_partitioned projection=[f0], full_filters=[arrow_partitioned.part = Int32(456)] physical_plan ArrowExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_arrow/part=456/data.arrow]]}, projection=[f0] + + +# Errors in partition filters should be reported +query error Divide by zero error +SELECT f0 FROM arrow_partitioned WHERE CASE WHEN true THEN 1 / 0 ELSE part END = 1; diff --git a/datafusion/sqllogictest/test_files/errors.slt b/datafusion/sqllogictest/test_files/errors.slt index ce09475253445..da46a7e5e6796 100644 --- a/datafusion/sqllogictest/test_files/errors.slt +++ b/datafusion/sqllogictest/test_files/errors.slt @@ -133,3 +133,7 @@ create table foo as values (1), ('foo'); query error No function matches select 1 group by substr(''); + +# Error in filter should be reported +query error Divide by zero +SELECT c2 from aggregate_test_100 where CASE WHEN true THEN 1 / 0 ELSE 0 END = 1;