Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::optimizer::{
RemoveNoopOperators, SimplifyCasts,
};
use crate::parser::parse;
use crate::planner::{LogicalPlanError, Planner};
use crate::planner::{LogicalPlanError, Planner, PlannerContext};
use crate::storage::{CsvStorage, Storage, StorageError, StorageImpl};
use crate::util::pretty_plan_tree_string;

Expand Down Expand Up @@ -54,7 +54,7 @@ impl Database {
Ok(data)
}

fn default_optimizer(&self, root: PlanRef) -> HepOptimizer {
fn default_optimizer(&self, root: PlanRef, planner_context: PlannerContext) -> HepOptimizer {
// the order of rules is important and affects the rule matching logic
let batches = vec![
HepBatch::new(
Expand Down Expand Up @@ -101,7 +101,7 @@ impl Database {
),
];

HepOptimizer::new(batches, root)
HepOptimizer::new(batches, root, planner_context)
}

pub async fn run(&self, sql: &str) -> Result<Vec<RecordBatch>, DatabaseError> {
Expand All @@ -123,15 +123,15 @@ impl Database {
println!("bound_stmt:\n{:#?}\n", bound_stmt);

// 3. convert bound stmts to logical plan
let planner = Planner {};
let mut planner = Planner::default();
let logical_plan = planner.plan(bound_stmt)?;
println!(
"original_plan:\n{}\n",
pretty_plan_tree_string(&*logical_plan)
);

// 4. optimize logical plan to physical plan
let mut optimizer = self.default_optimizer(logical_plan);
let mut optimizer = self.default_optimizer(logical_plan, planner.context);
let physical_plan = optimizer.find_best();
println!(
"optimized_plan:\n{}\n",
Expand Down Expand Up @@ -165,15 +165,15 @@ impl Database {
let bound_stmt = binder.bind(&stats[0])?;

let mut explain_str = String::new();
let planner = Planner {};
let mut planner = Planner::default();
let logical_plan = planner.plan(bound_stmt)?;
_ = write!(
explain_str,
"original plan:\n{}\n",
pretty_plan_tree_string(&*logical_plan)
);

let mut optimizer = self.default_optimizer(logical_plan);
let mut optimizer = self.default_optimizer(logical_plan, planner.context);
let physical_plan = optimizer.find_best();
_ = write!(
explain_str,
Expand Down
10 changes: 5 additions & 5 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use self::project::ProjectExecutor;
use self::table_scan::TableScanExecutor;
use crate::optimizer::{
PhysicalCrossJoin, PhysicalFilter, PhysicalHashAgg, PhysicalHashJoin, PhysicalLimit,
PhysicalOrder, PhysicalProject, PhysicalSimpleAgg, PhysicalTableScan, PlanNode, PlanRef,
PlanTreeNode, PlanVisitor,
PhysicalOrder, PhysicalProject, PhysicalSimpleAgg, PhysicalTableScan, PlanRef, PlanTreeNode,
PlanVisitor,
};
use crate::storage::{StorageError, StorageImpl};

Expand Down Expand Up @@ -107,7 +107,7 @@ impl PlanVisitor<BoxedExecutor> for ExecutorBuilder {
right_child: self.visit(plan.right()).unwrap(),
join_type: plan.join_type(),
join_condition: plan.join_condition(),
join_output_schema: plan.output_columns(),
join_output_schema: plan.join_output_columns(),
}
.execute(),
)
Expand All @@ -118,7 +118,7 @@ impl PlanVisitor<BoxedExecutor> for ExecutorBuilder {
CrossJoinExecutor {
left_child: self.visit(plan.left()).unwrap(),
right_child: self.visit(plan.right()).unwrap(),
join_output_schema: plan.output_columns(),
join_output_schema: plan.join_output_columns(),
}
.execute(),
)
Expand Down Expand Up @@ -251,7 +251,7 @@ mod executor_test {
println!("bound_stmt = {:#?}", bound_stmt);

// convert bound stmts to logical plan
let planner = Planner {};
let mut planner = Planner::default();
let logical_plan = planner.plan(bound_stmt)?;
println!("logical_plan = {:#?}", logical_plan);
let mut input_ref_rewriter = InputRefRewriter::default();
Expand Down
3 changes: 2 additions & 1 deletion src/optimizer/core/rule.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use enum_dispatch::enum_dispatch;

use super::{OptExpr, Pattern};
use crate::planner::PlannerContext;

/// A rule is to transform logically equivalent expression. There are two kinds of rules:
///
Expand All @@ -13,7 +14,7 @@ pub trait Rule {

/// Apply the rule and write the transformation result to `Substitute`.
/// The pattern tree determines the opt_expr tree internal nodes type.
fn apply(&self, opt_expr: OptExpr, result: &mut Substitute);
fn apply(&self, opt_expr: OptExpr, result: &mut Substitute, planner_context: &PlannerContext);
}

/// Define the transformed plans
Expand Down
14 changes: 10 additions & 4 deletions src/optimizer/heuristic/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@ use super::matcher::HepMatcher;
use crate::optimizer::core::{PatternMatcher, Rule, Substitute};
use crate::optimizer::rules::RuleImpl;
use crate::optimizer::PlanRef;
use crate::planner::PlannerContext;
use crate::util::pretty_plan_tree_string;

pub struct HepOptimizer {
batches: Vec<HepBatch>,
graph: HepGraph,
planner_context: PlannerContext,
}

impl HepOptimizer {
pub fn new(batches: Vec<HepBatch>, root: PlanRef) -> Self {
pub fn new(batches: Vec<HepBatch>, root: PlanRef, planner_context: PlannerContext) -> Self {
let graph = HepGraph::new(root);
Self { batches, graph }
Self {
batches,
graph,
planner_context,
}
}

pub fn find_best(&mut self) -> PlanRef {
Expand Down Expand Up @@ -97,7 +103,7 @@ impl HepOptimizer {
if let Some(opt_expr) = matcher.match_opt_expr() {
let mut substitute = Substitute::default();
let opt_expr_root = opt_expr.root.clone();
rule.apply(opt_expr, &mut substitute);
rule.apply(opt_expr, &mut substitute, &self.planner_context);

if !substitute.opt_exprs.is_empty() {
assert!(substitute.opt_exprs.len() == 1);
Expand Down Expand Up @@ -172,7 +178,7 @@ mod tests {
HepBatchStrategy::once_topdown(),
vec![PhysicalRewriteRule::create()],
);
let mut planner = HepOptimizer::new(vec![batch], root);
let mut planner = HepOptimizer::new(vec![batch], root, Default::default());
let new_plan = planner.find_best();
assert_eq!(
new_plan.as_physical_project().unwrap().logical().exprs()[0],
Expand Down
6 changes: 5 additions & 1 deletion src/optimizer/plan_node/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ impl PlanNode for Dummy {
vec![]
}

fn output_columns(&self) -> Vec<ColumnCatalog> {
fn output_columns(&self, _base_table_id: String) -> Vec<ColumnCatalog> {
vec![]
}

fn get_based_table_id(&self) -> crate::catalog::TableId {
"Dummy".to_string()
}
}

impl PlanTreeNode for Dummy {
Expand Down
16 changes: 12 additions & 4 deletions src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;

use super::{PlanNode, PlanRef, PlanTreeNode};
use crate::binder::BoundExpr;
use crate::catalog::ColumnCatalog;
use crate::catalog::{ColumnCatalog, TableId};

#[derive(Debug, Clone)]
pub struct LogicalAgg {
Expand Down Expand Up @@ -36,16 +36,24 @@ impl LogicalAgg {

impl PlanNode for LogicalAgg {
fn referenced_columns(&self) -> Vec<ColumnCatalog> {
self.output_columns()
self.group_by
.iter()
.chain(self.agg_funcs.iter())
.flat_map(|e| e.get_referenced_column_catalog())
.collect::<Vec<_>>()
}

fn output_columns(&self) -> Vec<ColumnCatalog> {
fn output_columns(&self, base_table_id: String) -> Vec<ColumnCatalog> {
self.group_by
.iter()
.chain(self.agg_funcs.iter())
.flat_map(|e| e.get_referenced_column_catalog())
.map(|e| e.output_column_catalog_for_alias_table(base_table_id.clone()))
.collect::<Vec<_>>()
}

fn get_based_table_id(&self) -> TableId {
self.children()[0].get_based_table_id()
}
}

impl PlanTreeNode for LogicalAgg {
Expand Down
10 changes: 7 additions & 3 deletions src/optimizer/plan_node/logical_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;

use super::{PlanNode, PlanRef, PlanTreeNode};
use crate::binder::BoundExpr;
use crate::catalog::ColumnCatalog;
use crate::catalog::{ColumnCatalog, TableId};

#[derive(Debug, Clone)]
pub struct LogicalFilter {
Expand Down Expand Up @@ -32,8 +32,12 @@ impl PlanNode for LogicalFilter {
self.expr.get_referenced_column_catalog()
}

fn output_columns(&self) -> Vec<ColumnCatalog> {
self.children()[0].output_columns()
fn output_columns(&self, base_table_id: String) -> Vec<ColumnCatalog> {
self.children()[0].output_columns(base_table_id)
}

fn get_based_table_id(&self) -> TableId {
self.children()[0].get_based_table_id()
}
}

Expand Down
Loading