Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 51 additions & 33 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
//! on a plan with an empty relation.
//! This rule also removes OFFSET 0 from the [LogicalPlan]
//! This saves time in planning and executing the query.
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan};

use crate::{utils, OptimizerConfig, OptimizerRule};

/// Optimization rule that eliminate LIMIT 0 or useless LIMIT(skip:0, fetch:None).
/// It can cooperate with `propagate_empty_relation` and `limit_push_down`.
#[derive(Default)]
Expand All @@ -41,54 +41,71 @@ impl OptimizerRule for EliminateLimit {
fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
if let LogicalPlan::Limit(limit) = plan {
match limit.fetch {
Some(fetch) => {
if fetch == 0 {
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: limit.input.schema().clone(),
})));
}
let limit = match plan {
LogicalPlan::Limit(limit) => limit,
_ => return Ok(None),
};

match limit.fetch {
Some(fetch) => {
if fetch == 0 {
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: limit.input.schema().clone(),
})));
}
None => {
if limit.skip == 0 {
let input = &*limit.input;
return Ok(Some(utils::optimize_children(self, input, config)?));
}
}
None => {
if limit.skip == 0 {
let input = limit.input.as_ref();
// input also can be Limit, so we should apply again.
return Ok(Some(
self.try_optimize(input, _config)?
.unwrap_or_else(|| input.clone()),
));
}
}
}
Ok(Some(utils::optimize_children(self, plan, config)?))
Ok(None)
}

fn name(&self) -> &str {
"eliminate_limit"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::optimizer::Optimizer;
use crate::test::*;
use crate::OptimizerContext;
use datafusion_common::Column;
use datafusion_expr::{
col,
logical_plan::{builder::LogicalPlanBuilder, JoinType},
sum,
};
use std::sync::Arc;

use crate::optimizer::OptimizerContext;
use crate::push_down_limit::PushDownLimit;
use crate::test::*;

use super::*;

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
let optimized_plan = EliminateLimit::new()
.try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let optimizer = Optimizer::with_rules(vec![Arc::new(EliminateLimit::new())]);
let optimized_plan = optimizer
.optimize_recursively(
optimizer.rules.get(0).unwrap(),
plan,
&OptimizerContext::new(),
)?
.unwrap_or_else(|| plan.clone());

let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
assert_eq!(plan.schema(), optimized_plan.schema());
Expand All @@ -99,13 +116,14 @@ mod tests {
plan: &LogicalPlan,
expected: &str,
) -> Result<()> {
let optimized_plan = PushDownLimit::new()
.try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let optimized_plan = EliminateLimit::new()
.try_optimize(&optimized_plan, &OptimizerContext::new())
.unwrap()
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
let config = OptimizerContext::new().with_max_passes(1);
let optimizer = Optimizer::with_rules(vec![
Arc::new(PushDownLimit::new()),
Arc::new(EliminateLimit::new()),
]);
let optimized_plan = optimizer
.optimize(plan, &config, observe)
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
119 changes: 118 additions & 1 deletion datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ pub trait OptimizerRule {

/// A human readable name for this optimizer rule
fn name(&self) -> &str;

/// How should the rule be applied by the optimizer? See comments on [`ApplyOrder`] for details.
///
/// If a rule use default None, its should traverse recursively plan inside itself
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// If a rule use default None, its should traverse recursively plan inside itself
/// How should the rule be applied by the optimizer? See comments on [`ApplyOrder`] for details.
///
/// If a rule returns `None`, the default, its should traverse the plan recursively inside itself

fn apply_order(&self) -> Option<ApplyOrder> {
None
}
Comment on lines +66 to +69
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It' compatible with origin

}

/// Options to control the DataFusion Optimizer.
Expand Down Expand Up @@ -180,6 +187,44 @@ pub struct Optimizer {
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}

/// If a rule is with `ApplyOrder`, it means the optimizer will derive to handle children instead of
/// recursively handling in rule.
/// We just need handle a subtree pattern itself.
///
/// Notice: **sometime** result after optimize still can be optimized, we need apply again.
///
/// Usage Example: Merge Limit (subtree pattern is: Limit-Limit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you

/// ```rust
/// use datafusion_expr::{Limit, LogicalPlan, LogicalPlanBuilder};
/// use datafusion_common::Result;
/// fn merge_limit(parent: &Limit, child: &Limit) -> LogicalPlan {
/// // just for run
/// return parent.input.as_ref().clone();
/// }
/// fn try_optimize(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
/// match plan {
/// LogicalPlan::Limit(limit) => match limit.input.as_ref() {
/// LogicalPlan::Limit(child_limit) => {
/// // merge limit ...
/// let optimized_plan = merge_limit(limit, child_limit);
/// // due to optimized_plan may be optimized again,
/// // for example: plan is Limit-Limit-Limit
/// Ok(Some(
/// try_optimize(&optimized_plan)?
/// .unwrap_or_else(|| optimized_plan.clone()),
/// ))
/// }
/// _ => Ok(None),
/// },
/// _ => Ok(None),
/// }
/// }
/// ```
pub enum ApplyOrder {
TopDown,
BottomUp,
}

impl Default for Optimizer {
fn default() -> Self {
Self::new()
Expand Down Expand Up @@ -253,8 +298,8 @@ impl Optimizer {
debug!("Skipping rule {} due to optimizer config", rule.name());
continue;
}
let result = self.optimize_recursively(rule, &new_plan, config);

let result = rule.try_optimize(&new_plan, config);
match result {
Ok(Some(plan)) => {
if !plan.schema().equivalent_names_and_types(new_plan.schema()) {
Expand Down Expand Up @@ -315,6 +360,78 @@ impl Optimizer {
debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}

fn optimize_node(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What value does this function add? In other words, why not call rule.try_optimize directly at the callsite 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to add future TODO:
we can do batch optimize

for rule in rules:
    rule.try_optimize(rule)

&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// TODO: future feature: We can do Batch optimize
rule.try_optimize(plan, config)
}

fn optimize_inputs(
&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let inputs = plan.inputs();
let result = inputs
.iter()
.map(|sub_plan| self.optimize_recursively(rule, sub_plan, config))
.collect::<Result<Vec<_>>>()?;
if result.is_empty() || result.iter().all(|o| o.is_none()) {
return Ok(None);
}

let new_inputs = result
.into_iter()
.enumerate()
.map(|(i, o)| match o {
Some(plan) => plan,
None => (*(inputs.get(i).unwrap())).clone(),
})
.collect::<Vec<_>>();

Ok(Some(plan.with_new_inputs(new_inputs.as_slice())?))
}

/// Use a rule to optimize the whole plan.
/// If the rule with `ApplyOrder`, we don't need to recursively handle children in rule.
pub fn optimize_recursively(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add docstrings about what this API does?

Also, I wonder if it needs to be pub or if it could be private 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometime we need use it in UT😂

&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match rule.apply_order() {
Some(order) => match order {
ApplyOrder::TopDown => {
let optimize_self_opt = self.optimize_node(rule, plan, config)?;
let optimize_inputs_opt = match &optimize_self_opt {
Some(optimized_plan) => {
self.optimize_inputs(rule, optimized_plan, config)?
}
_ => self.optimize_inputs(rule, plan, config)?,
};
Ok(optimize_inputs_opt.or(optimize_self_opt))
}
ApplyOrder::BottomUp => {
let optimize_inputs_opt = self.optimize_inputs(rule, plan, config)?;
let optimize_self_opt = match &optimize_inputs_opt {
Some(optimized_plan) => {
self.optimize_node(rule, optimized_plan, config)?
}
_ => self.optimize_node(rule, plan, config)?,
};
Ok(optimize_self_opt.or(optimize_inputs_opt))
}
},
_ => rule.try_optimize(plan, config),
}
}
}

/// Log the plan in debug/tracing mode after some part of the optimizer runs
Expand Down