Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ pub fn main() -> Result<()> {
// run the analyzer with our custom rule
let config = OptimizerContext::default().with_skip_failing_rules(false);
let analyzer = Analyzer::with_rules(vec![Arc::new(MyAnalyzerRule {})]);
let analyzed_plan = analyzer.execute_and_check(&logical_plan, config.options())?;
let analyzed_plan =
analyzer.execute_and_check(&logical_plan, config.options(), |_, _| {})?;
println!(
"Analyzed Logical Plan:\n\n{}\n",
analyzed_plan.display_indent()
Expand Down
38 changes: 34 additions & 4 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1675,9 +1675,37 @@ impl SessionState {
if let LogicalPlan::Explain(e) = plan {
let mut stringified_plans = e.stringified_plans.clone();

let analyzed_plan = self
.analyzer
.execute_and_check(e.plan.as_ref(), self.options())?;
// analyze & capture output of each rule
let analyzed_plan = match self.analyzer.execute_and_check(
e.plan.as_ref(),
self.options(),
|analyzed_plan, analyzer| {
let analyzer_name = analyzer.name().to_string();
let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name };
stringified_plans.push(analyzed_plan.to_stringified(plan_type));
},
) {
Ok(plan) => plan,
Err(DataFusionError::Context(analyzer_name, err)) => {
let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name };
stringified_plans
.push(StringifiedPlan::new(plan_type, err.to_string()));

return Ok(LogicalPlan::Explain(Explain {
verbose: e.verbose,
plan: e.plan.clone(),
stringified_plans,
schema: e.schema.clone(),
logical_optimization_succeeded: false,
}));
}
Err(e) => return Err(e),
};

// to delineate the analyzer & optimizer phases in explain output
stringified_plans
.push(analyzed_plan.to_stringified(PlanType::FinalAnalyzedLogicalPlan));

// optimize the child plan, capturing the output of each optimizer
let (plan, logical_optimization_succeeded) = match self.optimizer.optimize(
&analyzed_plan,
Expand Down Expand Up @@ -1706,7 +1734,9 @@ impl SessionState {
logical_optimization_succeeded,
}))
} else {
let analyzed_plan = self.analyzer.execute_and_check(plan, self.options())?;
let analyzed_plan =
self.analyzer
.execute_and_check(plan, self.options(), |_, _| {})?;
self.optimizer.optimize(&analyzed_plan, self, |_, _| {})
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/sql/subqueries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ async fn invalid_scalar_subquery() -> Result<()> {
let dataframe = ctx.sql(sql).await.expect(&msg);
let err = dataframe.into_optimized_plan().err().unwrap();
assert_eq!(
"Plan(\"Scalar subquery should only return one column\")",
r#"Context("check_analyzed_plan", Plan("Scalar subquery should only return one column"))"#,
&format!("{err:?}")
);

Expand All @@ -203,7 +203,7 @@ async fn subquery_not_allowed() -> Result<()> {
let err = dataframe.into_optimized_plan().err().unwrap();

assert_eq!(
"Plan(\"In/Exist subquery can not be used in Sort plan nodes\")",
r#"Context("check_analyzed_plan", Plan("In/Exist subquery can not be used in Sort plan nodes"))"#,
&format!("{err:?}")
);

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sqllogictests/test_files/dates.slt
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ g
h

## Plan error when compare Utf8 and timestamp in where clause
statement error DataFusion error: Error during planning: Timestamp\(Nanosecond, Some\("\+00:00"\)\) \+ Utf8 can't be evaluated because there isn't a common type to coerce the types to
statement error Error during planning: Timestamp\(Nanosecond, Some\("\+00:00"\)\) \+ Utf8 can't be evaluated because there isn't a common type to coerce the types to
select i_item_desc from test
where d3_date > now() + '5 days';

Expand Down
11 changes: 11 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1813,6 +1813,13 @@ pub enum Partitioning {
pub enum PlanType {
/// The initial LogicalPlan provided to DataFusion
InitialLogicalPlan,
/// The LogicalPlan which results from applying an analyzer pass
AnalyzedLogicalPlan {
/// The name of the analyzer which produced this plan
analyzer_name: String,
},
/// The LogicalPlan after all analyzer passes have been applied
FinalAnalyzedLogicalPlan,
/// The LogicalPlan which results from applying an optimizer pass
OptimizedLogicalPlan {
/// The name of the optimizer which produced this plan
Expand All @@ -1835,6 +1842,10 @@ impl Display for PlanType {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
PlanType::InitialLogicalPlan => write!(f, "initial_logical_plan"),
PlanType::AnalyzedLogicalPlan { analyzer_name } => {
write!(f, "logical_plan after {analyzer_name}")
}
PlanType::FinalAnalyzedLogicalPlan => write!(f, "analyzed_logical_plan"),
PlanType::OptimizedLogicalPlan { optimizer_name } => {
write!(f, "logical_plan after {optimizer_name}")
}
Expand Down
18 changes: 14 additions & 4 deletions datafusion/optimizer/src/analyzer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,29 @@ impl Analyzer {

/// Analyze the logical plan by applying analyzer rules, and
/// do necessary check and fail the invalid plans
pub fn execute_and_check(
pub fn execute_and_check<F>(
&self,
plan: &LogicalPlan,
config: &ConfigOptions,
) -> Result<LogicalPlan> {
mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
{
let start_time = Instant::now();
let mut new_plan = plan.clone();

// TODO add common rule executor for Analyzer and Optimizer
for rule in &self.rules {
new_plan = rule.analyze(new_plan, config)?;
new_plan = rule.analyze(new_plan, config).map_err(|e| {
DataFusionError::Context(rule.name().to_string(), Box::new(e))
})?;
observer(&new_plan, rule.as_ref());
}
check_plan(&new_plan)?;
// for easier display in explain output
check_plan(&new_plan).map_err(|e| {
DataFusionError::Context("check_analyzed_plan".to_string(), Box::new(e))
})?;
log_plan("Final analyzed plan", &new_plan);
debug!("Analyzer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ mod test {
.err()
.unwrap();
assert_eq!(
"Plan(\"Coercion from [Utf8] to the signature Uniform(1, [Int32]) failed.\")",
r#"Context("type_coercion", Plan("Coercion from [Utf8] to the signature Uniform(1, [Int32]) failed."))"#,
&format!("{err:?}")
);
Ok(())
Expand Down Expand Up @@ -914,7 +914,7 @@ mod test {
.err()
.unwrap();
assert_eq!(
"Plan(\"Coercion from [Utf8] to the signature Uniform(1, [Float64]) failed.\")",
r#"Context("type_coercion", Plan("Coercion from [Utf8] to the signature Uniform(1, [Float64]) failed."))"#,
&format!("{err:?}")
);
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub fn assert_analyzed_plan_eq(
) -> Result<()> {
let options = ConfigOptions::default();
let analyzed_plan =
Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options)?;
Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options, |_, _| {})?;
let formatted_plan = format!("{analyzed_plan:?}");
assert_eq!(formatted_plan, expected);

Expand Down
8 changes: 3 additions & 5 deletions datafusion/optimizer/tests/integration-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource};
use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
use datafusion_optimizer::{OptimizerConfig, OptimizerContext};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::ast::Statement;
use datafusion_sql::sqlparser::dialect::GenericDialect;
Expand Down Expand Up @@ -351,8 +351,8 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
let analyzer = Analyzer::new();
let optimizer = Optimizer::new();
// analyze and optimize the logical plan
let plan = analyzer.execute_and_check(&plan, config.options())?;
optimizer.optimize(&plan, &config, &observe)
let plan = analyzer.execute_and_check(&plan, config.options(), |_, _| {})?;
optimizer.optimize(&plan, &config, |_, _| {})
}

#[derive(Default)]
Expand Down Expand Up @@ -412,8 +412,6 @@ impl ContextProvider for MySchemaProvider {
}
}

fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}

struct MyTableSource {
schema: SchemaRef,
}
Expand Down
6 changes: 6 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,10 @@ message ArrowType{
//}
message EmptyMessage{}

message AnalyzedLogicalPlanType {
string analyzer_name = 1;
}

message OptimizedLogicalPlanType {
string optimizer_name = 1;
}
Expand All @@ -937,6 +941,8 @@ message OptimizedPhysicalPlanType {
message PlanType {
oneof plan_type_enum {
EmptyMessage InitialLogicalPlan = 1;
AnalyzedLogicalPlanType AnalyzedLogicalPlan = 7;
EmptyMessage FinalAnalyzedLogicalPlan = 8;
OptimizedLogicalPlanType OptimizedLogicalPlan = 2;
EmptyMessage FinalLogicalPlan = 3;
EmptyMessage InitialPhysicalPlan = 4;
Expand Down
118 changes: 118 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading