Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2987,9 +2987,7 @@ mod tests {
JoinType::Inner,
Some(Expr::Literal(ScalarValue::Null)),
)?;
let expected_plan = "CrossJoin:\
\n TableScan: a projection=[c1], full_filters=[Boolean(NULL)]\
\n TableScan: b projection=[c1]";
let expected_plan = "EmptyRelation";
assert_eq!(expected_plan, format!("{}", join.into_optimized_plan()?));

// JOIN ON expression must be boolean type
Expand Down
36 changes: 18 additions & 18 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::Arc;
use super::ListingTableUrl;
use super::PartitionedFile;
use crate::execution::context::SessionState;
use datafusion_common::internal_err;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{BinaryExpr, Operator};

Expand Down Expand Up @@ -285,25 +286,20 @@ async fn prune_partitions(
let props = ExecutionProps::new();

// Applies `filter` to `batch` returning `None` on error
let do_filter = |filter| -> Option<ArrayRef> {
let expr = create_physical_expr(filter, &df_schema, &props).ok()?;
expr.evaluate(&batch)
.ok()?
.into_array(partitions.len())
.ok()
let do_filter = |filter| -> Result<ArrayRef> {
let expr = create_physical_expr(filter, &df_schema, &props)?;
expr.evaluate(&batch)?.into_array(partitions.len())
};

//.Compute the conjunction of the filters, ignoring errors
//.Compute the conjunction of the filters
let mask = filters
.iter()
.fold(None, |acc, filter| match (acc, do_filter(filter)) {
(Some(a), Some(b)) => Some(and(&a, b.as_boolean()).unwrap_or(a)),
(None, Some(r)) => Some(r.as_boolean().clone()),
(r, None) => r,
});
.map(|f| do_filter(f).map(|a| a.as_boolean().clone()))
.reduce(|a, b| Ok(and(&a?, &b?)?));

let mask = match mask {
Some(mask) => mask,
Some(Ok(mask)) => mask,
Some(Err(err)) => return Err(err),
None => return Ok(partitions),
};

Expand Down Expand Up @@ -401,8 +397,8 @@ fn evaluate_partition_prefix<'a>(

/// Discover the partitions on the given path and prune out files
/// that belong to irrelevant partitions using `filters` expressions.
/// `filters` might contain expressions that can be resolved only at the
/// file level (e.g. Parquet row group pruning).
/// `filters` should only contain expressions that can be evaluated
/// using only the partition columns.
pub async fn pruned_partition_list<'a>(
ctx: &'a SessionState,
store: &'a dyn ObjectStore,
Expand All @@ -413,6 +409,12 @@ pub async fn pruned_partition_list<'a>(
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
// if no partition col => simply list all the files
if partition_cols.is_empty() {
if !filters.is_empty() {
return internal_err!(
"Got partition filters for unpartitioned table {}",
table_path
);
}
return Ok(Box::pin(
table_path
.list_all_files(ctx, store, file_extension)
Expand Down Expand Up @@ -631,13 +633,11 @@ mod tests {
]);
let filter1 = Expr::eq(col("part1"), lit("p1v2"));
let filter2 = Expr::eq(col("part2"), lit("p2v1"));
// filter3 cannot be resolved at partition pruning
let filter3 = Expr::eq(col("part2"), col("other"));
let pruned = pruned_partition_list(
&state,
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter1, filter2, filter3],
&[filter1, filter2],
".parquet",
&[
(String::from("part1"), DataType::Utf8),
Expand Down
69 changes: 37 additions & 32 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,16 @@ impl ListingTable {
}
}

// Expressions can be used for parttion pruning if they can be evaluated using
// only the partiton columns and there are partition columns.
fn can_be_evaluted_for_partition_pruning(
partition_column_names: &[&str],
expr: &Expr,
) -> bool {
!partition_column_names.is_empty()
&& expr_applicable_for_cols(partition_column_names, expr)
}

#[async_trait]
impl TableProvider for ListingTable {
fn as_any(&self) -> &dyn Any {
Expand All @@ -807,10 +817,28 @@ impl TableProvider for ListingTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// extract types of partition columns
let table_partition_cols = self
.options
.table_partition_cols
.iter()
.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
.collect::<Result<Vec<_>>>()?;

let table_partition_col_names = table_partition_cols
.iter()
.map(|field| field.name().as_str())
.collect::<Vec<_>>();
// If the filters can be resolved using only partition cols, there is no need to
// pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
let (partition_filters, filters): (Vec<_>, Vec<_>) =
filters.iter().cloned().partition(|filter| {
can_be_evaluted_for_partition_pruning(&table_partition_col_names, filter)
});
// TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here?
let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
let (mut partitioned_file_lists, statistics) = self
.list_files_for_scan(session_state, filters, limit)
.list_files_for_scan(session_state, &partition_filters, limit)
.await?;

// if no files need to be read, return an `EmptyExec`
Expand Down Expand Up @@ -846,28 +874,6 @@ impl TableProvider for ListingTable {
None => {} // no ordering required
};

// extract types of partition columns
let table_partition_cols = self
.options
.table_partition_cols
.iter()
.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
.collect::<Result<Vec<_>>>()?;

// If the filters can be resolved using only partition cols, there is no need to
// pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
let table_partition_col_names = table_partition_cols
.iter()
.map(|field| field.name().as_str())
.collect::<Vec<_>>();
let filters = filters
.iter()
.filter(|filter| {
!expr_applicable_for_cols(&table_partition_col_names, filter)
})
.cloned()
.collect::<Vec<_>>();

let filters = conjunction(filters.to_vec())
.map(|expr| -> Result<_> {
// NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns.
Expand Down Expand Up @@ -908,18 +914,17 @@ impl TableProvider for ListingTable {
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
let partition_column_names = self
.options
.table_partition_cols
.iter()
.map(|col| col.0.as_str())
.collect::<Vec<_>>();
filters
.iter()
.map(|filter| {
if expr_applicable_for_cols(
&self
.options
.table_partition_cols
.iter()
.map(|col| col.0.as_str())
.collect::<Vec<_>>(),
filter,
) {
if can_be_evaluted_for_partition_pruning(&partition_column_names, filter)
{
// if filter can be handled by partition pruning, it is exact
return Ok(TableProviderFilterPushDown::Exact);
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/sqllogictest/test_files/arrow_files.slt
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,8 @@ EXPLAIN SELECT f0 FROM arrow_partitioned WHERE part = 456
----
logical_plan TableScan: arrow_partitioned projection=[f0], full_filters=[arrow_partitioned.part = Int32(456)]
physical_plan ArrowExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_arrow/part=456/data.arrow]]}, projection=[f0]


# Errors in partition filters should be reported
query error Divide by zero error
SELECT f0 FROM arrow_partitioned WHERE CASE WHEN true THEN 1 / 0 ELSE part END = 1;
4 changes: 4 additions & 0 deletions datafusion/sqllogictest/test_files/errors.slt
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,7 @@ create table foo as values (1), ('foo');

query error No function matches
select 1 group by substr('');

# Error in filter should be reported
query error Divide by zero
SELECT c2 from aggregate_test_100 where CASE WHEN true THEN 1 / 0 ELSE 0 END = 1;