diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 850ce745c8c..c2202c59110 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -154,7 +154,10 @@ impl ExecutionContext { ))), }, - plan => Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))), + plan => Ok(Arc::new(DataFrameImpl::new( + self.state.clone(), + &self.optimize(&plan)?, + ))), } } @@ -1702,6 +1705,23 @@ mod tests { } Ok(()) } + #[test] + fn ctx_sql_should_optimize_plan() -> Result<()> { + let mut ctx = ExecutionContext::new(); + let plan1 = + ctx.create_logical_plan("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?; + + let opt_plan1 = ctx.optimize(&plan1)?; + + let plan2 = ctx.sql("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?; + + assert_eq!( + format!("{:?}", opt_plan1), + format!("{:?}", plan2.to_logical_plan()) + ); + + Ok(()) + } #[tokio::test] async fn scalar_udf() -> Result<()> { diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs index 115da4b3010..432b64501a3 100644 --- a/rust/datafusion/src/optimizer/projection_push_down.rs +++ b/rust/datafusion/src/optimizer/projection_push_down.rs @@ -18,7 +18,7 @@ //! Projection Push Down optimizer rule ensures that only referenced columns are //! loaded into memory -use crate::error::{DataFusionError, Result}; +use crate::error::Result; use crate::logical_plan::{DFField, DFSchema, DFSchemaRef, LogicalPlan, ToDFSchema}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; @@ -57,16 +57,9 @@ impl ProjectionPushDown { fn get_projected_schema( schema: &Schema, - projection: &Option>, required_columns: &HashSet, has_projection: bool, ) -> Result<(Vec, DFSchemaRef)> { - if projection.is_some() { - return Err(DataFusionError::Internal( - "Cannot run projection push-down rule more than once".to_string(), - )); - } - // once we reach the table scan, we can use the accumulated set of column // names to construct the set of column indexes in the scan // @@ -242,16 +235,11 @@ fn optimize_plan( LogicalPlan::TableScan { table_name, source, - projection, filters, .. } => { - let (projection, projected_schema) = get_projected_schema( - &source.schema(), - projection, - required_columns, - has_projection, - )?; + let (projection, projected_schema) = + get_projected_schema(&source.schema(), required_columns, has_projection)?; // return the table scan with projection Ok(LogicalPlan::TableScan { @@ -491,6 +479,26 @@ mod tests { Ok(()) } + /// tests that optimizing twice yields same plan + #[test] + fn test_double_optimization() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(&table_scan) + .project(&[col("b")])? + .project(&[lit(1).alias("a")])? + .build()?; + + let optimized_plan1 = optimize(&plan).expect("failed to optimize plan"); + let optimized_plan2 = + optimize(&optimized_plan1).expect("failed to optimize plan"); + + let formatted_plan1 = format!("{:?}", optimized_plan1); + let formatted_plan2 = format!("{:?}", optimized_plan2); + assert_eq!(formatted_plan1, formatted_plan2); + Ok(()) + } + /// tests that it removes an aggregate is never used downstream #[test] fn table_unused_aggregate() -> Result<()> {