Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ select a, (select max(b) from t1) max_b from t1;
-- supported in Roadmap 0.6 (planner_v2)
-- create and insert table in memory
create table t1(v1 int, v2 int, v3 int);
create table t2 as select * from read_csv('t2.csv');
insert into t1 values (0, 4, 1), (1, 5, 2);
select * from t1;
-- select only expressions
Expand Down
2 changes: 1 addition & 1 deletion src/execution/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub enum PhysicalOperator {
PhysicalCreateTable(PhysicalCreateTable),
PhysicalDummyScan(PhysicalDummyScan),
PhysicalExpressionScan(PhysicalExpressionScan),
PhysicalInsert(PhysicalInsert),
PhysicalInsert(Box<PhysicalInsert>),
PhysicalTableScan(PhysicalTableScan),
PhysicalProjection(PhysicalProjection),
PhysicalColumnDataScan(PhysicalColumnDataScan),
Expand Down
14 changes: 12 additions & 2 deletions src/execution/physical_plan/physical_create_table.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use derive_new::new;

use super::{PhysicalOperator, PhysicalOperatorBase};
use super::{PhysicalInsert, PhysicalOperator, PhysicalOperatorBase};
use crate::execution::PhysicalPlanGenerator;
use crate::planner_v2::{BoundCreateTableInfo, LogicalCreateTable};

Expand All @@ -13,6 +13,16 @@ pub struct PhysicalCreateTable {

impl PhysicalPlanGenerator {
pub(crate) fn create_physical_create_table(&self, op: LogicalCreateTable) -> PhysicalOperator {
PhysicalOperator::PhysicalCreateTable(PhysicalCreateTable::new(op.info))
if let Some(query) = op.info.query.clone() {
// create table as select
let query_plan = self.create_plan(*query);
let base = PhysicalOperatorBase::new(vec![query_plan], vec![]);
PhysicalOperator::PhysicalInsert(Box::new(PhysicalInsert::new_create_table_as(
base, op.info,
)))
} else {
// create table
PhysicalOperator::PhysicalCreateTable(PhysicalCreateTable::new(op.info))
}
}
}
44 changes: 37 additions & 7 deletions src/execution/physical_plan/physical_insert.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use derive_new::new;

use super::{PhysicalOperator, PhysicalOperatorBase};
use crate::catalog_v2::TableCatalogEntry;
use crate::execution::PhysicalPlanGenerator;
use crate::planner_v2::LogicalInsert;
use crate::planner_v2::{BoundCreateTableInfo, LogicalInsert};
use crate::types_v2::LogicalType;

#[derive(new, Clone)]
#[derive(Clone)]
pub struct PhysicalInsert {
pub(crate) base: PhysicalOperatorBase,
/// The insertion map ([table_index -> index in result, or INVALID_INDEX if not specified])
pub(crate) column_index_list: Vec<usize>,
/// The expected types for the INSERT statement
pub(crate) expected_types: Vec<LogicalType>,
pub(crate) table: TableCatalogEntry,
/// The table to insert into, the table is none when create table as
pub(crate) table: Option<TableCatalogEntry>,
/// For create table as statement
pub(crate) create_table_info: Option<BoundCreateTableInfo>,
}

impl PhysicalInsert {
Expand All @@ -23,18 +24,47 @@ impl PhysicalInsert {
column_index_list: self.column_index_list.clone(),
expected_types: self.expected_types.clone(),
table: self.table.clone(),
create_table_info: self.create_table_info.clone(),
}
}

pub fn new_insert_into(
base: PhysicalOperatorBase,
column_index_list: Vec<usize>,
expected_types: Vec<LogicalType>,
table: TableCatalogEntry,
) -> Self {
Self {
base,
column_index_list,
expected_types,
table: Some(table),
create_table_info: None,
}
}

pub fn new_create_table_as(
base: PhysicalOperatorBase,
create_table_info: BoundCreateTableInfo,
) -> Self {
Self {
base,
column_index_list: vec![],
expected_types: vec![],
table: None,
create_table_info: Some(create_table_info),
}
}
}

impl PhysicalPlanGenerator {
pub(crate) fn create_physical_insert(&self, op: LogicalInsert) -> PhysicalOperator {
let base = self.create_physical_operator_base(op.base);
PhysicalOperator::PhysicalInsert(PhysicalInsert::new(
PhysicalOperator::PhysicalInsert(Box::new(PhysicalInsert::new_insert_into(
base,
op.column_index_list,
op.expected_types,
op.table,
))
)))
}
}
22 changes: 16 additions & 6 deletions src/execution/volcano_executor/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures_async_stream::try_stream;

use crate::catalog_v2::{Catalog, DataTable, DataTableInfo};
use crate::execution::{ExecutionContext, ExecutorError, PhysicalCreateTable};
use crate::planner_v2::BoundCreateTableInfo;
use crate::storage_v2::LocalStorage;

#[derive(new)]
Expand All @@ -17,22 +18,31 @@ pub struct CreateTable {
}

impl CreateTable {
#[try_stream(boxed, ok = RecordBatch, error = ExecutorError)]
pub async fn execute(self, context: Arc<ExecutionContext>) {
let schema = self.plan.info.base.base.schema;
let table = self.plan.info.base.table;
let column_definitions = self.plan.info.base.columns;
pub fn create_table(
context: Arc<ExecutionContext>,
info: &BoundCreateTableInfo,
) -> Result<DataTable, ExecutorError> {
let schema = info.base.base.schema.clone();
let table = info.base.table.clone();
let column_definitions = info.base.columns.clone();
let data_table = DataTable::new(
DataTableInfo::new(schema.clone(), table.clone()),
column_definitions,
);
Catalog::create_table(
context.clone_client_context(),
schema,
table.clone(),
table,
data_table.clone(),
)?;
LocalStorage::init_table(context.clone_client_context(), &data_table);
Ok(data_table)
}

#[try_stream(boxed, ok = RecordBatch, error = ExecutorError)]
pub async fn execute(self, context: Arc<ExecutionContext>) {
let table = self.plan.info.base.table.clone();
Self::create_table(context, &self.plan.info)?;
let array = Arc::new(StringArray::from(vec![format!("CREATE TABLE {}", table)]));
let fields = vec![Field::new("success", DataType::Utf8, false)];
yield RecordBatch::try_new(
Expand Down
44 changes: 40 additions & 4 deletions src/execution/volcano_executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use arrow::record_batch::RecordBatch;
use derive_new::new;
use futures_async_stream::try_stream;

use super::CreateTable;
use crate::catalog_v2::DataTable;
use crate::execution::{
BoxedExecutor, ExecutionContext, ExecutorError, ExpressionExecutor, PhysicalInsert,
};
Expand All @@ -23,12 +25,13 @@ pub struct Insert {
}

impl Insert {
#[try_stream(boxed, ok = RecordBatch, error = ExecutorError)]
pub async fn execute(self, context: Arc<ExecutionContext>) {
let table = self.plan.table.storage;
fn insert_into_bound_info(
plan: PhysicalInsert,
) -> Result<(DataTable, Arc<Schema>, Vec<BoundExpression>), ExecutorError> {
let table = plan.table.unwrap().storage;
let mut exprs = vec![];
let mut fields = vec![];
for (table_col_idx, col_insert_idx) in self.plan.column_index_list.iter().enumerate() {
for (table_col_idx, col_insert_idx) in plan.column_index_list.iter().enumerate() {
let column = table.column_definitions[table_col_idx].clone();
fields.push(Field::new(
column.name.as_str(),
Expand All @@ -51,6 +54,39 @@ impl Insert {
}
}
let schema = SchemaRef::new(Schema::new_with_metadata(fields.clone(), HashMap::new()));
Ok((table, schema, exprs))
}

pub fn create_table_bound_info(
table: DataTable,
) -> Result<(DataTable, Arc<Schema>, Vec<BoundExpression>), ExecutorError> {
let mut exprs = vec![];
let mut fields = vec![];
for (idx, column) in table.column_definitions.iter().enumerate() {
fields.push(Field::new(
column.name.as_str(),
column.ty.clone().into(),
true,
));
let base = BoundExpressionBase::new("".to_string(), column.ty.clone());
exprs.push(BoundExpression::BoundReferenceExpression(
BoundReferenceExpression::new(base, idx),
));
}
let schema = SchemaRef::new(Schema::new_with_metadata(fields.clone(), HashMap::new()));
Ok((table, schema, exprs))
}

#[try_stream(boxed, ok = RecordBatch, error = ExecutorError)]
pub async fn execute(self, context: Arc<ExecutionContext>) {
let (table, schema, exprs) = if let Some(create_table_info) = self.plan.create_table_info {
// create table as
let table = CreateTable::create_table(context.clone(), &create_table_info)?;
Self::create_table_bound_info(table)?
} else {
// insert into
Self::insert_into_bound_info(self.plan)?
};
#[for_await]
for batch in self.child {
let batch = batch?;
Expand Down
2 changes: 1 addition & 1 deletion src/execution/volcano_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl VolcanoExecutor {
PhysicalOperator::PhysicalInsert(op) => {
let child = op.base.children.first().unwrap().clone();
let child_executor = self.build(child, context.clone());
Insert::new(op, child_executor).execute(context)
Insert::new(*op, child_executor).execute(context)
}
PhysicalOperator::PhysicalTableScan(op) => TableScan::new(op).execute(context),
PhysicalOperator::PhysicalProjection(op) => {
Expand Down
6 changes: 3 additions & 3 deletions src/planner_v2/binder/query_node/bind_select_node.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;

use derive_new::new;
use sqlparser::ast::{Ident, Query};
use sqlparser::ast::{Ident, SetExpr};

use super::BoundResultModifier;
use crate::planner_v2::{
Expand Down Expand Up @@ -34,9 +34,9 @@ pub struct BoundSelectNode {
}

impl Binder {
pub fn bind_select_node(&mut self, select_node: &Query) -> Result<BoundSelectNode, BindError> {
pub fn bind_query_body(&mut self, query_body: &SetExpr) -> Result<BoundSelectNode, BindError> {
let projection_index = self.generate_table_index();
let mut bound_select_node = match &*select_node.body {
let mut bound_select_node = match query_body {
sqlparser::ast::SetExpr::Select(select) => self.bind_select_body(select)?,
sqlparser::ast::SetExpr::Query(_) => todo!(),
sqlparser::ast::SetExpr::SetOperation { .. } => todo!(),
Expand Down
6 changes: 3 additions & 3 deletions src/planner_v2/binder/sqlparser_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,16 @@ impl SqlparserQueryBuilder {
}
}

pub fn build(self) -> Query {
Query {
pub fn build(self) -> Box<Query> {
Box::new(Query {
with: None,
body: self.body,
order_by: vec![],
limit: None,
offset: None,
fetch: None,
lock: None,
}
})
}
}

Expand Down
52 changes: 44 additions & 8 deletions src/planner_v2/binder/statement/bind_create.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use itertools::Itertools;
use sqlparser::ast::Statement;

use super::BoundStatement;
Expand All @@ -9,15 +10,35 @@ use crate::planner_v2::{
use crate::types_v2::LogicalType;

impl Binder {
pub fn bind_create_table(&self, stmt: &Statement) -> Result<BoundStatement, BindError> {
pub fn bind_create_table(&mut self, stmt: &Statement) -> Result<BoundStatement, BindError> {
match stmt {
Statement::CreateTable { name, columns, .. } => {
Statement::CreateTable {
name,
columns,
query,
..
} => {
let (schema, table) = SqlparserResolver::object_name_to_schema_table(name)?;
let column_definitions = columns
.iter()
.map(SqlparserResolver::column_def_to_column_definition)
.try_collect()?;
let bound_info = BoundCreateTableInfo::new(schema, table, column_definitions);
let (column_definitions, query) = if let Some(query) = query {
// create table columns baesd on query names and types
let select = self.bind_query(query)?;
let cols = select
.names
.into_iter()
.zip_eq(select.types.into_iter())
.map(|(name, ty)| ColumnDefinition::new(name, ty))
.collect::<Vec<_>>();
(cols, Some(Box::new(select.plan)))
} else {
// create table columns based on input column_def
let cols = columns
.iter()
.map(SqlparserResolver::column_def_to_column_definition)
.try_collect()?;
(cols, None)
};
let bound_info =
BoundCreateTableInfo::new(schema, table, column_definitions, query);
let plan = LogicalOperator::LogicalCreateTable(LogicalCreateTable::new(bound_info));
Ok(BoundStatement::new(
plan,
Expand All @@ -33,14 +54,29 @@ impl Binder {
#[derive(Debug, Clone)]
pub struct BoundCreateTableInfo {
pub(crate) base: CreateTableInfo,
/// CREATE TABLE from QUERY
pub(crate) query: Option<Box<LogicalOperator>>,
}

impl BoundCreateTableInfo {
pub fn new(schema: String, table: String, column_definitions: Vec<ColumnDefinition>) -> Self {
pub fn new(
schema: String,
table: String,
column_definitions: Vec<ColumnDefinition>,
query: Option<Box<LogicalOperator>>,
) -> Self {
let base = CreateInfoBase::new(schema);
let create_table_info = CreateTableInfo::new(base, table, column_definitions);
Self {
base: create_table_info,
query,
}
}

pub fn new_create_as_query(base: CreateTableInfo, query: LogicalOperator) -> Self {
Self {
base,
query: Some(Box::new(query)),
}
}
}
3 changes: 1 addition & 2 deletions src/planner_v2/binder/statement/bind_explain_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ impl Binder {
.selection_col_eq_string("table_name", table_name.as_str())
.build();
let query = SqlparserQueryBuilder::new_from_select(select).build();
let node = self.bind_select_node(&query)?;
self.create_plan_for_select_node(node)
self.bind_query(&query)
}
_ => Err(BindError::UnsupportedStmt(format!("{:?}", stmt))),
}
Expand Down
3 changes: 1 addition & 2 deletions src/planner_v2/binder/statement/bind_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,9 @@ impl Binder {
}
}

let select_node = self.bind_select_node(source)?;
let select_node = self.bind_query(source)?;
let expected_columns_cnt = named_column_indices.len();

let select_node = self.create_plan_for_select_node(select_node)?;
let inserted_types = select_node.types;
let mut plan = select_node.plan;
Self::check_insert_column_count_mismatch(
Expand Down
Loading