Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/execution/physical_plan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
mod physical_column_data_scan;
mod physical_create_table;
mod physical_dummy_scan;
mod physical_explain;
mod physical_expression_scan;
mod physical_insert;
mod physical_projection;
mod physical_table_scan;

use derive_new::new;
pub use physical_column_data_scan::*;
pub use physical_create_table::*;
pub use physical_dummy_scan::*;
pub use physical_explain::*;
pub use physical_expression_scan::*;
pub use physical_insert::*;
pub use physical_projection::*;
Expand All @@ -30,4 +34,19 @@ pub enum PhysicalOperator {
PhysicalInsert(PhysicalInsert),
PhysicalTableScan(PhysicalTableScan),
PhysicalProjection(PhysicalProjection),
PhysicalColumnDataScan(PhysicalColumnDataScan),
}

impl PhysicalOperator {
pub fn children(&self) -> &[PhysicalOperator] {
match self {
PhysicalOperator::PhysicalCreateTable(op) => &op.base.children,
PhysicalOperator::PhysicalExpressionScan(op) => &op.base.children,
PhysicalOperator::PhysicalInsert(op) => &op.base.children,
PhysicalOperator::PhysicalTableScan(op) => &op.base.children,
PhysicalOperator::PhysicalProjection(op) => &op.base.children,
PhysicalOperator::PhysicalDummyScan(op) => &op.base.children,
PhysicalOperator::PhysicalColumnDataScan(op) => &op.base.children,
}
}
}
11 changes: 11 additions & 0 deletions src/execution/physical_plan/physical_column_data_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use arrow::record_batch::RecordBatch;
use derive_new::new;

use super::PhysicalOperatorBase;

/// The PhysicalColumnDataScan scans a Arrow RecordBatch
#[derive(new, Clone)]
pub struct PhysicalColumnDataScan {
pub(crate) base: PhysicalOperatorBase,
pub(crate) collection: Vec<RecordBatch>,
}
2 changes: 1 addition & 1 deletion src/execution/physical_plan/physical_create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::planner_v2::{BoundCreateTableInfo, LogicalCreateTable};
#[derive(new, Clone)]
pub struct PhysicalCreateTable {
#[new(default)]
_base: PhysicalOperatorBase,
pub(crate) base: PhysicalOperatorBase,
pub(crate) info: BoundCreateTableInfo,
}

Expand Down
41 changes: 41 additions & 0 deletions src/execution/physical_plan/physical_explain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::sync::Arc;

use arrow::array::StringArray;
use arrow::record_batch::RecordBatch;

use super::{PhysicalColumnDataScan, PhysicalOperator, PhysicalOperatorBase};
use crate::execution::{PhysicalPlanGenerator, SchemaUtil};
use crate::planner_v2::LogicalExplain;
use crate::util::tree_render::TreeRender;

impl PhysicalPlanGenerator {
pub(crate) fn create_physical_explain(&self, op: LogicalExplain) -> PhysicalOperator {
let types = op.base.types.clone();
let logical_child = op.base.children[0].clone();
// optimized logical plan explain string
let logical_plan_opt_string = TreeRender::logical_plan_tree(&logical_child);

let physical_child = self.create_plan_internal(logical_child);
// physical plan explain string
let physical_plan_string = TreeRender::physical_plan_tree(&physical_child);

let base = PhysicalOperatorBase::new(vec![], types.clone());

let schema = SchemaUtil::new_schema_ref(&["type".to_string(), "plan".to_string()], &types);
let types_column = Arc::new(StringArray::from(vec![
"logical_plan".to_string(),
"logical_plan_opt".to_string(),
"physical_plan".to_string(),
]));
let plans_column = Arc::new(StringArray::from(vec![
op.logical_plan,
logical_plan_opt_string,
physical_plan_string,
]));
let collection = RecordBatch::try_new(schema, vec![types_column, plans_column]).unwrap();
PhysicalOperator::PhysicalColumnDataScan(PhysicalColumnDataScan::new(
base,
vec![collection],
))
}
}
2 changes: 1 addition & 1 deletion src/execution/physical_plan/physical_expression_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::types_v2::LogicalType;
#[derive(new, Clone)]
pub struct PhysicalExpressionScan {
#[new(default)]
pub(crate) _base: PhysicalOperatorBase,
pub(crate) base: PhysicalOperatorBase,
/// The types of the expressions
pub(crate) expr_types: Vec<LogicalType>,
/// The set of expressions to scan
Expand Down
2 changes: 1 addition & 1 deletion src/execution/physical_plan/physical_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::types_v2::LogicalType;

#[derive(new, Clone)]
pub struct PhysicalTableScan {
pub(crate) _base: PhysicalOperatorBase,
pub(crate) base: PhysicalOperatorBase,
pub(crate) bind_table: TableCatalogEntry,
/// The types of ALL columns that can be returned by the table function
pub(crate) returned_types: Vec<LogicalType>,
Expand Down
1 change: 1 addition & 0 deletions src/execution/physical_plan_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl PhysicalPlanGenerator {
LogicalOperator::LogicalGet(op) => self.create_physical_table_scan(op),
LogicalOperator::LogicalProjection(op) => self.create_physical_projection(op),
LogicalOperator::LogicalDummyScan(op) => self.create_physical_dummy_scan(op),
LogicalOperator::LogicalExplain(op) => self.create_physical_explain(op),
}
}
}
21 changes: 21 additions & 0 deletions src/execution/volcano_executor/column_data_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use std::sync::Arc;

use arrow::record_batch::RecordBatch;
use derive_new::new;
use futures_async_stream::try_stream;

use crate::execution::{ExecutionContext, ExecutorError, PhysicalColumnDataScan};

#[derive(new)]
pub struct ColumnDataScan {
pub(crate) plan: PhysicalColumnDataScan,
}

impl ColumnDataScan {
#[try_stream(boxed, ok = RecordBatch, error = ExecutorError)]
pub async fn execute(self, _context: Arc<ExecutionContext>) {
for batch in self.plan.collection.into_iter() {
yield batch;
}
}
}
5 changes: 5 additions & 0 deletions src/execution/volcano_executor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod column_data_scan;
mod create_table;
mod dummy_scan;
mod expression_scan;
Expand All @@ -7,6 +8,7 @@ mod table_scan;
use std::sync::Arc;

use arrow::record_batch::RecordBatch;
pub use column_data_scan::*;
pub use create_table::*;
pub use dummy_scan::*;
pub use expression_scan::*;
Expand Down Expand Up @@ -46,6 +48,9 @@ impl VolcanoExecutor {
Projection::new(op, child_executor).execute(context)
}
PhysicalOperator::PhysicalDummyScan(op) => DummyScan::new(op).execute(context),
PhysicalOperator::PhysicalColumnDataScan(op) => {
ColumnDataScan::new(op).execute(context)
}
}
}

Expand Down
34 changes: 34 additions & 0 deletions src/planner_v2/binder/statement/bind_explain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use sqlparser::ast::Statement;

use super::BoundStatement;
use crate::planner_v2::{
BindError, Binder, ExplainType, LogicalExplain, LogicalOperator, LogicalOperatorBase,
};
use crate::types_v2::LogicalType;
use crate::util::tree_render::TreeRender;

impl Binder {
pub fn bind_explain(&mut self, stmt: &Statement) -> Result<BoundStatement, BindError> {
match stmt {
Statement::Explain {
statement, analyze, ..
} => {
let bound_stmt = self.bind(statement)?;
let explain_type = if *analyze {
ExplainType::ANALYZE
} else {
ExplainType::STANDARD
};

let types = vec![LogicalType::Varchar, LogicalType::Varchar];
let names = vec!["explain_type".to_string(), "explain_value".to_string()];
let logical_plan_string = TreeRender::logical_plan_tree(&bound_stmt.plan);
let base = LogicalOperatorBase::new(vec![bound_stmt.plan], vec![], vec![]);
let logical_explain = LogicalExplain::new(base, explain_type, logical_plan_string);
let new_plan = LogicalOperator::LogicalExplain(logical_explain);
Ok(BoundStatement::new(new_plan, types, names))
}
_ => Err(BindError::UnsupportedStmt(format!("{:?}", stmt))),
}
}
}
2 changes: 2 additions & 0 deletions src/planner_v2/binder/statement/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod bind_create;
mod bind_explain;
mod bind_insert;
mod bind_select;
mod create_info;
Expand Down Expand Up @@ -26,6 +27,7 @@ impl Binder {
Statement::CreateTable { .. } => self.bind_create_table(statement),
Statement::Insert { .. } => self.bind_insert(statement),
Statement::Query { .. } => self.bind_select(statement),
Statement::Explain { .. } => self.bind_explain(statement),
_ => Err(BindError::UnsupportedStmt(format!("{:?}", statement))),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/planner_v2/logical_operator_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub trait LogicalOperatorVisitor {
}

fn visit_operator_children(&mut self, op: &mut LogicalOperator) {
for child in op.children() {
for child in op.children_mut() {
self.visit_operator(child);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/planner_v2/operator/logical_create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use derive_new::new;
use super::LogicalOperatorBase;
use crate::planner_v2::BoundCreateTableInfo;

#[derive(new, Debug)]
#[derive(new, Debug, Clone)]
pub struct LogicalCreateTable {
#[new(default)]
pub(crate) base: LogicalOperatorBase,
Expand Down
2 changes: 1 addition & 1 deletion src/planner_v2/operator/logical_dummy_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use derive_new::new;
use super::LogicalOperatorBase;

/// LogicalDummyScan represents a dummy scan returning nothing.
#[derive(new, Debug)]
#[derive(new, Debug, Clone)]
pub struct LogicalDummyScan {
#[new(default)]
pub(crate) base: LogicalOperatorBase,
Expand Down
18 changes: 18 additions & 0 deletions src/planner_v2/operator/logical_explain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use derive_new::new;

use super::LogicalOperatorBase;

#[derive(new, Debug, Clone)]
pub struct LogicalExplain {
pub(crate) base: LogicalOperatorBase,
#[allow(dead_code)]
pub(crate) explain_type: ExplainType,
/// un-optimized logical plan explain string
pub(crate) logical_plan: String,
}

#[derive(Debug, Clone)]
pub enum ExplainType {
STANDARD,
ANALYZE,
}
2 changes: 1 addition & 1 deletion src/planner_v2/operator/logical_expression_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::planner_v2::BoundExpression;
use crate::types_v2::LogicalType;

/// LogicalExpressionGet represents a scan operation over a set of to-be-executed expressions
#[derive(new, Debug)]
#[derive(new, Debug, Clone)]
pub struct LogicalExpressionGet {
pub(crate) base: LogicalOperatorBase,
pub(crate) table_idx: usize,
Expand Down
2 changes: 1 addition & 1 deletion src/planner_v2/operator/logical_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::catalog_v2::TableCatalogEntry;
use crate::types_v2::LogicalType;

/// LogicalGet represents a scan operation from a data source
#[derive(new, Debug)]
#[derive(new, Debug, Clone)]
pub struct LogicalGet {
pub(crate) base: LogicalOperatorBase,
pub(crate) table_idx: usize,
Expand Down
2 changes: 1 addition & 1 deletion src/planner_v2/operator/logical_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::LogicalOperatorBase;
use crate::catalog_v2::TableCatalogEntry;
use crate::types_v2::LogicalType;

#[derive(new, Debug)]
#[derive(new, Debug, Clone)]
pub struct LogicalInsert {
pub(crate) base: LogicalOperatorBase,
/// The insertion map ([table_index -> index in result, or INVALID_INDEX if not specified])
Expand Down
2 changes: 1 addition & 1 deletion src/planner_v2/operator/logical_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use derive_new::new;

use super::LogicalOperatorBase;

#[derive(new, Debug)]
#[derive(new, Debug, Clone)]
pub struct LogicalProjection {
pub(crate) base: LogicalOperatorBase,
pub(crate) table_idx: usize,
Expand Down
Loading