From c45a6c8b0cdf93139b66c434eab88c257724a8d0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 29 Jun 2021 15:54:49 -0400 Subject: [PATCH 1/2] Update API for extension planning to include logical plan --- datafusion/src/execution/context.rs | 10 ++++ datafusion/src/physical_plan/mod.rs | 14 +---- datafusion/src/physical_plan/planner.rs | 79 +++++++++++++++++++++---- datafusion/tests/user_defined_plan.rs | 9 ++- 4 files changed, 87 insertions(+), 25 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 436bce5952bdc..263fcd39b64f0 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -3376,6 +3376,16 @@ mod tests { "query not supported".to_string(), )) } + + fn create_physical_expr( + &self, + _e: &Expr, + _input_dfschema: &crate::logical_plan::DFSchema, + _input_schema: &Schema, + _ctx_state: &ExecutionContextState, + ) -> Result> { + unimplemented!() + } } struct MyQueryPlanner {} diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 2122751abb604..307fff619478e 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -20,8 +20,6 @@ use self::{ coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan, }; -use crate::execution::context::ExecutionContextState; -use crate::logical_plan::LogicalPlan; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::{ error::{DataFusionError, Result}, @@ -122,16 +120,8 @@ impl SQLMetric { } } -/// Physical query planner that converts a `LogicalPlan` to an -/// `ExecutionPlan` suitable for execution. -pub trait PhysicalPlanner { - /// Create a physical plan from a logical plan - fn create_physical_plan( - &self, - logical_plan: &LogicalPlan, - ctx_state: &ExecutionContextState, - ) -> Result>; -} +/// Physical planner interface +pub use self::planner::PhysicalPlanner; /// `ExecutionPlan` represent nodes in the DataFusion Physical Plan. /// diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 75f15653ba463..8da32ae76a331 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -39,9 +39,7 @@ use crate::physical_plan::sort::SortExec; use crate::physical_plan::udf; use crate::physical_plan::windows::WindowAggExec; use crate::physical_plan::{hash_utils, Partitioning}; -use crate::physical_plan::{ - AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner, WindowExpr, -}; +use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr}; use crate::prelude::JoinType; use crate::scalar::ScalarValue; use crate::sql::utils::{generate_sort_key, window_expr_common_partition_keys}; @@ -172,16 +170,51 @@ fn physical_name(e: &Expr, input_schema: &DFSchema) -> Result { } } +/// Physical query planner that converts a `LogicalPlan` to an +/// `ExecutionPlan` suitable for execution. +pub trait PhysicalPlanner { + /// Create a physical plan from a logical plan + fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + ctx_state: &ExecutionContextState, + ) -> Result>; + + /// Create a physical expression from a logical expression + /// suitable for evaluation + /// + /// `e`: the expression to convert + /// + /// `input_dfschema`: the logical plan schema for evaluating `e` + /// + /// `input_schema`: the physical schema for evaluating `e` + fn create_physical_expr( + &self, + e: &Expr, + input_dfschema: &DFSchema, + input_schema: &Schema, + ctx_state: &ExecutionContextState, + ) -> Result>; +} + /// This trait exposes the ability to plan an [`ExecutionPlan`] out of a [`LogicalPlan`]. pub trait ExtensionPlanner { /// Create a physical plan for a [`UserDefinedLogicalNode`]. - /// This errors when the planner knows how to plan the concrete implementation of `node` - /// but errors while doing so, and `None` when the planner does not know how to plan the `node` - /// and wants to delegate the planning to another [`ExtensionPlanner`]. + /// + /// `input_dfschema`: the logical plan schema for the inputs to this node + /// + /// Returns an error when the planner knows how to plan the concrete + /// implementation of `node` but errors while doing so. + /// + /// Returns `None` when the planner does not know how to plan the + /// `node` and wants to delegate the planning to another + /// [`ExtensionPlanner`]. fn plan_extension( &self, + planner: &dyn PhysicalPlanner, node: &dyn UserDefinedLogicalNode, - inputs: &[Arc], + logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc], ctx_state: &ExecutionContextState, ) -> Result>>; } @@ -210,6 +243,24 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { let plan = self.create_initial_plan(logical_plan, ctx_state)?; self.optimize_plan(plan, ctx_state) } + + /// Create a physical expression from a logical expression + /// suitable for evaluation + /// + /// `e`: the expression to convert + /// + /// `input_dfschema`: the logical plan schema for evaluating `e` + /// + /// `input_schema`: the physical schema for evaluating `e` + fn create_physical_expr( + &self, + e: &Expr, + input_dfschema: &DFSchema, + input_schema: &Schema, + ctx_state: &ExecutionContextState, + ) -> Result> { + self.create_physical_expr(e, input_dfschema, input_schema, ctx_state) + } } impl DefaultPhysicalPlanner { @@ -721,7 +772,7 @@ impl DefaultPhysicalPlanner { ))) } LogicalPlan::Extension { node } => { - let inputs = node + let physical_inputs = node .inputs() .into_iter() .map(|input_plan| self.create_initial_plan(input_plan, ctx_state)) @@ -733,7 +784,13 @@ impl DefaultPhysicalPlanner { if let Some(plan) = maybe_plan { Ok(Some(plan)) } else { - planner.plan_extension(node.as_ref(), &inputs, ctx_state) + planner.plan_extension( + self, + node.as_ref(), + &node.inputs(), + &physical_inputs, + ctx_state, + ) } }, )?; @@ -1644,8 +1701,10 @@ mod tests { /// Create a physical plan for an extension node fn plan_extension( &self, + _planner: &dyn PhysicalPlanner, _node: &dyn UserDefinedLogicalNode, - _inputs: &[Arc], + _logical_inputs: &[&LogicalPlan], + _physical_inputs: &[Arc], _ctx_state: &ExecutionContextState, ) -> Result>> { Ok(Some(Arc::new(NoOpExecutionPlan { diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index 22ebec8b9a994..21b49638d23a1 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -321,16 +321,19 @@ impl ExtensionPlanner for TopKPlanner { /// Create a physical plan for an extension node fn plan_extension( &self, + _planner: &dyn PhysicalPlanner, node: &dyn UserDefinedLogicalNode, - inputs: &[Arc], + logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc], _ctx_state: &ExecutionContextState, ) -> Result>> { Ok( if let Some(topk_node) = node.as_any().downcast_ref::() { - assert_eq!(inputs.len(), 1, "Inconsistent number of inputs"); + assert_eq!(logical_inputs.len(), 1, "Inconsistent number of inputs"); + assert_eq!(physical_inputs.len(), 1, "Inconsistent number of inputs"); // figure out input name Some(Arc::new(TopKExec { - input: inputs[0].clone(), + input: physical_inputs[0].clone(), k: topk_node.k, })) } else { From d61280a5fe506b2081b9dd1460c0e8485ebf236a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 30 Jun 2021 17:14:15 -0400 Subject: [PATCH 2/2] Review comments --- datafusion/src/execution/context.rs | 2 +- datafusion/src/physical_plan/planner.rs | 14 ++++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 263fcd39b64f0..d5a84869ad94a 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -3379,7 +3379,7 @@ mod tests { fn create_physical_expr( &self, - _e: &Expr, + _expr: &Expr, _input_dfschema: &crate::logical_plan::DFSchema, _input_schema: &Schema, _ctx_state: &ExecutionContextState, diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 8da32ae76a331..5b43ec12bbf03 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -183,14 +183,14 @@ pub trait PhysicalPlanner { /// Create a physical expression from a logical expression /// suitable for evaluation /// - /// `e`: the expression to convert + /// `expr`: the expression to convert /// /// `input_dfschema`: the logical plan schema for evaluating `e` /// /// `input_schema`: the physical schema for evaluating `e` fn create_physical_expr( &self, - e: &Expr, + expr: &Expr, input_dfschema: &DFSchema, input_schema: &Schema, ctx_state: &ExecutionContextState, @@ -254,12 +254,18 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { /// `input_schema`: the physical schema for evaluating `e` fn create_physical_expr( &self, - e: &Expr, + expr: &Expr, input_dfschema: &DFSchema, input_schema: &Schema, ctx_state: &ExecutionContextState, ) -> Result> { - self.create_physical_expr(e, input_dfschema, input_schema, ctx_state) + DefaultPhysicalPlanner::create_physical_expr( + self, + expr, + input_dfschema, + input_schema, + ctx_state, + ) } }