Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1249,13 +1249,25 @@ pub fn table_scan<'a>(
name: Option<impl Into<TableReference<'a>>>,
table_schema: &Schema,
projection: Option<Vec<usize>>,
) -> Result<LogicalPlanBuilder> {
table_scan_with_filters(name, table_schema, projection, vec![])
}

/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
/// and inlined filters.
/// This is mostly used for testing and documentation.
pub fn table_scan_with_filters<'a>(
name: Option<impl Into<TableReference<'a>>>,
table_schema: &Schema,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
) -> Result<LogicalPlanBuilder> {
let table_source = table_source(table_schema);
let name = name
.map(|n| n.into())
.unwrap_or_else(|| OwnedTableReference::bare(UNNAMED_TABLE))
.to_owned_reference();
LogicalPlanBuilder::scan(name, table_source, projection)
LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
}

fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
Expand Down
42 changes: 16 additions & 26 deletions datafusion/optimizer/src/simplify_expressions/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,30 +76,30 @@ pub trait SimplifyInfo {
/// assert_eq!(simplified, col("b").lt(lit(2)));
/// ```
pub struct SimplifyContext<'a> {
schemas: Vec<DFSchemaRef>,
schema: Option<DFSchemaRef>,
props: &'a ExecutionProps,
}

impl<'a> SimplifyContext<'a> {
/// Create a new SimplifyContext
pub fn new(props: &'a ExecutionProps) -> Self {
Self {
schemas: vec![],
schema: None,
props,
}
}

/// Register a [`DFSchemaRef`] with this context
pub fn with_schema(mut self, schema: DFSchemaRef) -> Self {
self.schemas.push(schema);
self.schema = Some(schema);
self
}
}

impl<'a> SimplifyInfo for SimplifyContext<'a> {
/// returns true if this Expr has boolean type
fn is_boolean_type(&self, expr: &Expr) -> Result<bool> {
for schema in &self.schemas {
for schema in &self.schema {
if let Ok(DataType::Boolean) = expr.get_type(schema) {
return Ok(true);
}
Expand All @@ -110,32 +110,22 @@ impl<'a> SimplifyInfo for SimplifyContext<'a> {

/// Returns true if expr is nullable
fn nullable(&self, expr: &Expr) -> Result<bool> {
self.schemas
.iter()
.find_map(|schema| {
// expr may be from another input, so ignore errors
// by converting to None to keep trying
expr.nullable(schema.as_ref()).ok()
})
.ok_or_else(|| {
// This means we weren't able to compute `Expr::nullable` with
// *any* input schemas, signalling a problem
DataFusionError::Internal(format!(
"Could not find columns in '{expr}' during simplify"
))
})
let schema = self.schema.as_ref().ok_or_else(|| {
DataFusionError::Internal(
"attempt to get nullability without schema".to_string(),
)
})?;
expr.nullable(schema.as_ref())
}

/// Returns data type of this expr needed for determining optimized int type of a value
fn get_data_type(&self, expr: &Expr) -> Result<DataType> {
if self.schemas.len() == 1 {
expr.get_type(&self.schemas[0])
} else {
Err(DataFusionError::Internal(
"The expr has more than one schema, could not determine data type"
.to_string(),
))
}
let schema = self.schema.as_ref().ok_or_else(|| {
DataFusionError::Internal(
"attempt to get data type without schema".to_string(),
)
})?;
expr.get_type(schema)
}

fn execution_props(&self) -> &ExecutionProps {
Expand Down
64 changes: 52 additions & 12 deletions datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

//! Simplify expressions optimizer rule and implementation

use std::sync::Arc;

use super::{ExprSimplifier, SimplifyContext};
use crate::utils::merge_schema;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::{DFSchemaRef, Result};
use datafusion_common::{DFSchema, DFSchemaRef, Result};
use datafusion_expr::{logical_plan::LogicalPlan, utils::from_plan};
use datafusion_physical_expr::execution_props::ExecutionProps;

Expand Down Expand Up @@ -61,16 +63,16 @@ impl SimplifyExpressions {
plan: &LogicalPlan,
execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
// Pass down the `children merge schema` and `plan schema` to evaluate expression types.
// pass all `child schema` and `plan schema` isn't enough, because like `t1 semi join t2 on
// on t1.id = t2.id`, each individual schema can't contain all the columns in it.
let children_merge_schema = DFSchemaRef::new(merge_schema(plan.inputs()));
let schemas = vec![plan.schema(), &children_merge_schema];
let info = schemas
.into_iter()
.fold(SimplifyContext::new(execution_props), |context, schema| {
context.with_schema(schema.clone())
});
let schema = if !plan.inputs().is_empty() {
DFSchemaRef::new(merge_schema(plan.inputs()))
} else if let LogicalPlan::TableScan(_) = plan {
// When predicates are pushed into a table scan, there needs to be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this schema should be empty (aka have no columns) rather than use the same plan schema. Logically the schema should be the columns that are available to evaluate expressions within the plan node

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I made this so it used the empty schema whenever the plan node had no inputs, the test csv_query_group_by_and_having_and_where would fail, since it attempts to simplify predicates that are inlined into a scan:
https://github.com/apache/arrow-datafusion/blob/caa60337c7a57572d93d8bd3cbc18006aabe55e6/datafusion/expr/src/logical_plan/plan.rs#L1428-L1429

It seems to be that inlined scan filters are a bit of an exception in that they are evaluated on top of the scan itself.

For other kinds of node (e.g., Values) I think you're right though, so I pushed a commit that refines the logic a bit more to use the plan's schema only for table scans.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you

// a schema to resolve the fields against.
Arc::clone(plan.schema())
} else {
Arc::new(DFSchema::empty())
};
let info = SimplifyContext::new(execution_props).with_schema(schema);

let simplifier = ExprSimplifier::new(info);

Expand Down Expand Up @@ -127,7 +129,8 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use chrono::{DateTime, TimeZone, Utc};
use datafusion_common::ScalarValue;
use datafusion_expr::{or, BinaryExpr, Cast, Operator};
use datafusion_expr::logical_plan::builder::table_scan_with_filters;
use datafusion_expr::{call_fn, or, BinaryExpr, Cast, Operator};

use crate::OptimizerContext;
use datafusion_expr::logical_plan::table_scan;
Expand Down Expand Up @@ -808,4 +811,41 @@ mod tests {

assert_optimized_plan_eq(&plan, expected)
}

#[test]
fn simplify_project_scalar_fn() -> Result<()> {
// Issue https://github.com/apache/arrow-datafusion/issues/5996
let schema = Schema::new(vec![Field::new("f", DataType::Float64, false)]);
let plan = table_scan(Some("test"), &schema, None)?
.project(vec![call_fn("power", vec![col("f"), lit(1.0)])?])?
.build()?;

// before simplify: power(t.f, 1.0)
// after simplify: t.f as "power(t.f, 1.0)"
let expected = "Projection: test.f AS power(test.f,Float64(1))\
\n TableScan: test";

assert_optimized_plan_eq(&plan, expected)
}

#[test]
fn simplify_scan_predicate() -> Result<()> {
let schema = Schema::new(vec![
Field::new("f", DataType::Float64, false),
Field::new("g", DataType::Float64, false),
]);
let plan = table_scan_with_filters(
Some("test"),
&schema,
None,
vec![col("g").eq(call_fn("power", vec![col("f"), lit(1.0)])?)],
)?
.build()?;

// before simplify: t.g = power(t.f, 1.0)
// after simplify: (t.g = t.f) as "t.g = power(t.f, 1.0)"
let expected =
"TableScan: test, unsupported_filters=[g = f AS g = power(f,Float64(1))]";
assert_optimized_plan_eq(&plan, expected)
}
}