diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index ec2a27542c56..cc3c54a965dc 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1249,13 +1249,25 @@ pub fn table_scan<'a>( name: Option>>, table_schema: &Schema, projection: Option>, +) -> Result { + table_scan_with_filters(name, table_schema, projection, vec![]) +} + +/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema, +/// and inlined filters. +/// This is mostly used for testing and documentation. +pub fn table_scan_with_filters<'a>( + name: Option>>, + table_schema: &Schema, + projection: Option>, + filters: Vec, ) -> Result { let table_source = table_source(table_schema); let name = name .map(|n| n.into()) .unwrap_or_else(|| OwnedTableReference::bare(UNNAMED_TABLE)) .to_owned_reference(); - LogicalPlanBuilder::scan(name, table_source, projection) + LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters) } fn table_source(table_schema: &Schema) -> Arc { diff --git a/datafusion/optimizer/src/simplify_expressions/context.rs b/datafusion/optimizer/src/simplify_expressions/context.rs index 0fe1e6ae81e8..34f3908c7e42 100644 --- a/datafusion/optimizer/src/simplify_expressions/context.rs +++ b/datafusion/optimizer/src/simplify_expressions/context.rs @@ -76,7 +76,7 @@ pub trait SimplifyInfo { /// assert_eq!(simplified, col("b").lt(lit(2))); /// ``` pub struct SimplifyContext<'a> { - schemas: Vec, + schema: Option, props: &'a ExecutionProps, } @@ -84,14 +84,14 @@ impl<'a> SimplifyContext<'a> { /// Create a new SimplifyContext pub fn new(props: &'a ExecutionProps) -> Self { Self { - schemas: vec![], + schema: None, props, } } /// Register a [`DFSchemaRef`] with this context pub fn with_schema(mut self, schema: DFSchemaRef) -> Self { - self.schemas.push(schema); + self.schema = Some(schema); self } } @@ -99,7 +99,7 @@ impl<'a> SimplifyContext<'a> { impl<'a> SimplifyInfo for SimplifyContext<'a> { /// returns true if this Expr has boolean type fn is_boolean_type(&self, expr: &Expr) -> Result { - for schema in &self.schemas { + for schema in &self.schema { if let Ok(DataType::Boolean) = expr.get_type(schema) { return Ok(true); } @@ -110,32 +110,22 @@ impl<'a> SimplifyInfo for SimplifyContext<'a> { /// Returns true if expr is nullable fn nullable(&self, expr: &Expr) -> Result { - self.schemas - .iter() - .find_map(|schema| { - // expr may be from another input, so ignore errors - // by converting to None to keep trying - expr.nullable(schema.as_ref()).ok() - }) - .ok_or_else(|| { - // This means we weren't able to compute `Expr::nullable` with - // *any* input schemas, signalling a problem - DataFusionError::Internal(format!( - "Could not find columns in '{expr}' during simplify" - )) - }) + let schema = self.schema.as_ref().ok_or_else(|| { + DataFusionError::Internal( + "attempt to get nullability without schema".to_string(), + ) + })?; + expr.nullable(schema.as_ref()) } /// Returns data type of this expr needed for determining optimized int type of a value fn get_data_type(&self, expr: &Expr) -> Result { - if self.schemas.len() == 1 { - expr.get_type(&self.schemas[0]) - } else { - Err(DataFusionError::Internal( - "The expr has more than one schema, could not determine data type" - .to_string(), - )) - } + let schema = self.schema.as_ref().ok_or_else(|| { + DataFusionError::Internal( + "attempt to get data type without schema".to_string(), + ) + })?; + expr.get_type(schema) } fn execution_props(&self) -> &ExecutionProps { diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 9591563f02f6..6717fe42bf20 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -17,10 +17,12 @@ //! Simplify expressions optimizer rule and implementation +use std::sync::Arc; + use super::{ExprSimplifier, SimplifyContext}; use crate::utils::merge_schema; use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::{DFSchemaRef, Result}; +use datafusion_common::{DFSchema, DFSchemaRef, Result}; use datafusion_expr::{logical_plan::LogicalPlan, utils::from_plan}; use datafusion_physical_expr::execution_props::ExecutionProps; @@ -61,16 +63,16 @@ impl SimplifyExpressions { plan: &LogicalPlan, execution_props: &ExecutionProps, ) -> Result { - // Pass down the `children merge schema` and `plan schema` to evaluate expression types. - // pass all `child schema` and `plan schema` isn't enough, because like `t1 semi join t2 on - // on t1.id = t2.id`, each individual schema can't contain all the columns in it. - let children_merge_schema = DFSchemaRef::new(merge_schema(plan.inputs())); - let schemas = vec![plan.schema(), &children_merge_schema]; - let info = schemas - .into_iter() - .fold(SimplifyContext::new(execution_props), |context, schema| { - context.with_schema(schema.clone()) - }); + let schema = if !plan.inputs().is_empty() { + DFSchemaRef::new(merge_schema(plan.inputs())) + } else if let LogicalPlan::TableScan(_) = plan { + // When predicates are pushed into a table scan, there needs to be + // a schema to resolve the fields against. + Arc::clone(plan.schema()) + } else { + Arc::new(DFSchema::empty()) + }; + let info = SimplifyContext::new(execution_props).with_schema(schema); let simplifier = ExprSimplifier::new(info); @@ -127,7 +129,8 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use chrono::{DateTime, TimeZone, Utc}; use datafusion_common::ScalarValue; - use datafusion_expr::{or, BinaryExpr, Cast, Operator}; + use datafusion_expr::logical_plan::builder::table_scan_with_filters; + use datafusion_expr::{call_fn, or, BinaryExpr, Cast, Operator}; use crate::OptimizerContext; use datafusion_expr::logical_plan::table_scan; @@ -808,4 +811,41 @@ mod tests { assert_optimized_plan_eq(&plan, expected) } + + #[test] + fn simplify_project_scalar_fn() -> Result<()> { + // Issue https://github.com/apache/arrow-datafusion/issues/5996 + let schema = Schema::new(vec![Field::new("f", DataType::Float64, false)]); + let plan = table_scan(Some("test"), &schema, None)? + .project(vec![call_fn("power", vec![col("f"), lit(1.0)])?])? + .build()?; + + // before simplify: power(t.f, 1.0) + // after simplify: t.f as "power(t.f, 1.0)" + let expected = "Projection: test.f AS power(test.f,Float64(1))\ + \n TableScan: test"; + + assert_optimized_plan_eq(&plan, expected) + } + + #[test] + fn simplify_scan_predicate() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("f", DataType::Float64, false), + Field::new("g", DataType::Float64, false), + ]); + let plan = table_scan_with_filters( + Some("test"), + &schema, + None, + vec![col("g").eq(call_fn("power", vec![col("f"), lit(1.0)])?)], + )? + .build()?; + + // before simplify: t.g = power(t.f, 1.0) + // after simplify: (t.g = t.f) as "t.g = power(t.f, 1.0)" + let expected = + "TableScan: test, unsupported_filters=[g = f AS g = power(f,Float64(1))]"; + assert_optimized_plan_eq(&plan, expected) + } }