Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 19 additions & 40 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ use crate::{
MemTable, ViewTable,
},
logical_plan::{PlanType, ToStringifiedPlan},
optimizer::eliminate_filter::EliminateFilter,
optimizer::eliminate_limit::EliminateLimit,
optimizer::{
eliminate_filter::EliminateFilter, eliminate_limit::EliminateLimit,
optimizer::Optimizer,
},
physical_optimizer::{
aggregate_statistics::AggregateStatistics,
hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule,
},
};
use log::{debug, trace};
use parking_lot::RwLock;
use std::string::String;
use std::sync::Arc;
Expand Down Expand Up @@ -1189,7 +1190,7 @@ pub struct SessionState {
/// Uuid for the session
pub session_id: String,
/// Responsible for optimizing a logical plan
pub optimizers: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
pub optimizer: Optimizer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a nice improvement 👍

/// Responsible for optimizing a physical execution plan
pub physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
/// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
Expand Down Expand Up @@ -1255,7 +1256,7 @@ impl SessionState {

SessionState {
session_id,
optimizers: vec![
optimizer: Optimizer::new(vec![
// Simplify expressions first to maximize the chance
// of applying other optimizations
Arc::new(SimplifyExpressions::new()),
Expand All @@ -1267,7 +1268,7 @@ impl SessionState {
Arc::new(FilterPushDown::new()),
Arc::new(LimitPushDown::new()),
Arc::new(SingleDistinctToGroupBy::new()),
],
]),
physical_optimizers: vec![
Arc::new(AggregateStatistics::new()),
Arc::new(HashBuildProbeOrder::new()),
Expand Down Expand Up @@ -1328,9 +1329,9 @@ impl SessionState {
/// Replace the optimizer rules
pub fn with_optimizer_rules(
mut self,
optimizers: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
) -> Self {
self.optimizers = optimizers;
self.optimizer = Optimizer::new(rules);
self
}

Expand All @@ -1348,7 +1349,7 @@ impl SessionState {
mut self,
optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
) -> Self {
self.optimizers.push(optimizer_rule);
self.optimizer.rules.push(optimizer_rule);
self
}

Expand All @@ -1363,16 +1364,21 @@ impl SessionState {

/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
let execution_props = &mut self.execution_props.clone();

if let LogicalPlan::Explain(e) = plan {
let mut stringified_plans = e.stringified_plans.clone();

// optimize the child plan, capturing the output of each optimizer
let plan =
self.optimize_internal(e.plan.as_ref(), |optimized_plan, optimizer| {
let plan = self.optimizer.optimize(
e.plan.as_ref(),
execution_props,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
stringified_plans.push(optimized_plan.to_stringified(plan_type));
})?;
},
)?;

Ok(LogicalPlan::Explain(Explain {
verbose: e.verbose,
Expand All @@ -1381,35 +1387,8 @@ impl SessionState {
schema: e.schema.clone(),
}))
} else {
self.optimize_internal(plan, |_, _| {})
}
}

/// Optimizes the logical plan by applying optimizer rules, and
/// invoking observer function after each call
fn optimize_internal<F>(
&self,
plan: &LogicalPlan,
mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
let execution_props = &mut self.execution_props.clone();
let optimizers = &self.optimizers;

let execution_props = execution_props.start_execution();

let mut new_plan = plan.clone();
debug!("Input logical plan:\n{}\n", plan.display_indent());
trace!("Full input logical plan:\n{:?}", plan);
for optimizer in optimizers {
new_plan = optimizer.optimize(&new_plan, execution_props)?;
observer(&new_plan, optimizer.as_ref());
self.optimizer.optimize(plan, execution_props, |_, _| {})
}
debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
trace!("Full Optimized logical plan:\n {:?}", plan);
Ok(new_plan)
}

/// Creates a physical plan from a logical plan.
Expand Down
43 changes: 43 additions & 0 deletions datafusion/core/src/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

//! Query optimizer traits

use std::sync::Arc;

use log::{debug, trace};

use crate::error::Result;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::LogicalPlan;
Expand All @@ -35,3 +39,42 @@ pub trait OptimizerRule {
/// A human readable name for this optimizer rule
fn name(&self) -> &str;
}

/// A rule-based optimizer.
#[derive(Clone)]
pub struct Optimizer {
/// All rules to apply
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}

impl Optimizer {
/// Create a new optimizer with the given rules
pub fn new(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
Self { rules }
}

/// Optimizes the logical plan by applying optimizer rules, and
/// invoking observer function after each call
pub fn optimize<F>(
&self,
plan: &LogicalPlan,
execution_props: &mut ExecutionProps,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to your changes but I am trying to get rid of the use of ExecutionProps here - #2614

mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
let execution_props = execution_props.start_execution();

let mut new_plan = plan.clone();
debug!("Input logical plan:\n{}\n", plan.display_indent());
trace!("Full input logical plan:\n{:?}", plan);
for rule in &self.rules {
new_plan = rule.optimize(&new_plan, execution_props)?;
observer(&new_plan, rule.as_ref());
}
debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
trace!("Full Optimized logical plan:\n {:?}", plan);
Ok(new_plan)
}
}