diff --git a/README.md b/README.md index 4f5bb44..9564221 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,7 @@ select a, (select max(b) from t1) max_b from t1; -- supported in Roadmap 0.6 (planner_v2) -- create and insert table in memory create table t1(v1 int, v2 int, v3 int); +create table t2 as select * from read_csv('t2.csv'); insert into t1 values (0, 4, 1), (1, 5, 2); select * from t1; -- select only expressions diff --git a/src/execution/physical_plan/mod.rs b/src/execution/physical_plan/mod.rs index e9230f4..1175556 100644 --- a/src/execution/physical_plan/mod.rs +++ b/src/execution/physical_plan/mod.rs @@ -35,7 +35,7 @@ pub enum PhysicalOperator { PhysicalCreateTable(PhysicalCreateTable), PhysicalDummyScan(PhysicalDummyScan), PhysicalExpressionScan(PhysicalExpressionScan), - PhysicalInsert(PhysicalInsert), + PhysicalInsert(Box), PhysicalTableScan(PhysicalTableScan), PhysicalProjection(PhysicalProjection), PhysicalColumnDataScan(PhysicalColumnDataScan), diff --git a/src/execution/physical_plan/physical_create_table.rs b/src/execution/physical_plan/physical_create_table.rs index 0f882a1..4f12f09 100644 --- a/src/execution/physical_plan/physical_create_table.rs +++ b/src/execution/physical_plan/physical_create_table.rs @@ -1,6 +1,6 @@ use derive_new::new; -use super::{PhysicalOperator, PhysicalOperatorBase}; +use super::{PhysicalInsert, PhysicalOperator, PhysicalOperatorBase}; use crate::execution::PhysicalPlanGenerator; use crate::planner_v2::{BoundCreateTableInfo, LogicalCreateTable}; @@ -13,6 +13,16 @@ pub struct PhysicalCreateTable { impl PhysicalPlanGenerator { pub(crate) fn create_physical_create_table(&self, op: LogicalCreateTable) -> PhysicalOperator { - PhysicalOperator::PhysicalCreateTable(PhysicalCreateTable::new(op.info)) + if let Some(query) = op.info.query.clone() { + // create table as select + let query_plan = self.create_plan(*query); + let base = PhysicalOperatorBase::new(vec![query_plan], vec![]); + PhysicalOperator::PhysicalInsert(Box::new(PhysicalInsert::new_create_table_as( + base, op.info, + ))) + } else { + // create table + PhysicalOperator::PhysicalCreateTable(PhysicalCreateTable::new(op.info)) + } } } diff --git a/src/execution/physical_plan/physical_insert.rs b/src/execution/physical_plan/physical_insert.rs index 8007daf..7ad40ff 100644 --- a/src/execution/physical_plan/physical_insert.rs +++ b/src/execution/physical_plan/physical_insert.rs @@ -1,19 +1,20 @@ -use derive_new::new; - use super::{PhysicalOperator, PhysicalOperatorBase}; use crate::catalog_v2::TableCatalogEntry; use crate::execution::PhysicalPlanGenerator; -use crate::planner_v2::LogicalInsert; +use crate::planner_v2::{BoundCreateTableInfo, LogicalInsert}; use crate::types_v2::LogicalType; -#[derive(new, Clone)] +#[derive(Clone)] pub struct PhysicalInsert { pub(crate) base: PhysicalOperatorBase, /// The insertion map ([table_index -> index in result, or INVALID_INDEX if not specified]) pub(crate) column_index_list: Vec, /// The expected types for the INSERT statement pub(crate) expected_types: Vec, - pub(crate) table: TableCatalogEntry, + /// The table to insert into, the table is none when create table as + pub(crate) table: Option, + /// For create table as statement + pub(crate) create_table_info: Option, } impl PhysicalInsert { @@ -23,6 +24,35 @@ impl PhysicalInsert { column_index_list: self.column_index_list.clone(), expected_types: self.expected_types.clone(), table: self.table.clone(), + create_table_info: self.create_table_info.clone(), + } + } + + pub fn new_insert_into( + base: PhysicalOperatorBase, + column_index_list: Vec, + expected_types: Vec, + table: TableCatalogEntry, + ) -> Self { + Self { + base, + column_index_list, + expected_types, + table: Some(table), + create_table_info: None, + } + } + + pub fn new_create_table_as( + base: PhysicalOperatorBase, + create_table_info: BoundCreateTableInfo, + ) -> Self { + Self { + base, + column_index_list: vec![], + expected_types: vec![], + table: None, + create_table_info: Some(create_table_info), } } } @@ -30,11 +60,11 @@ impl PhysicalInsert { impl PhysicalPlanGenerator { pub(crate) fn create_physical_insert(&self, op: LogicalInsert) -> PhysicalOperator { let base = self.create_physical_operator_base(op.base); - PhysicalOperator::PhysicalInsert(PhysicalInsert::new( + PhysicalOperator::PhysicalInsert(Box::new(PhysicalInsert::new_insert_into( base, op.column_index_list, op.expected_types, op.table, - )) + ))) } } diff --git a/src/execution/volcano_executor/create_table.rs b/src/execution/volcano_executor/create_table.rs index 31b79b9..2403d61 100644 --- a/src/execution/volcano_executor/create_table.rs +++ b/src/execution/volcano_executor/create_table.rs @@ -9,6 +9,7 @@ use futures_async_stream::try_stream; use crate::catalog_v2::{Catalog, DataTable, DataTableInfo}; use crate::execution::{ExecutionContext, ExecutorError, PhysicalCreateTable}; +use crate::planner_v2::BoundCreateTableInfo; use crate::storage_v2::LocalStorage; #[derive(new)] @@ -17,11 +18,13 @@ pub struct CreateTable { } impl CreateTable { - #[try_stream(boxed, ok = RecordBatch, error = ExecutorError)] - pub async fn execute(self, context: Arc) { - let schema = self.plan.info.base.base.schema; - let table = self.plan.info.base.table; - let column_definitions = self.plan.info.base.columns; + pub fn create_table( + context: Arc, + info: &BoundCreateTableInfo, + ) -> Result { + let schema = info.base.base.schema.clone(); + let table = info.base.table.clone(); + let column_definitions = info.base.columns.clone(); let data_table = DataTable::new( DataTableInfo::new(schema.clone(), table.clone()), column_definitions, @@ -29,10 +32,17 @@ impl CreateTable { Catalog::create_table( context.clone_client_context(), schema, - table.clone(), + table, data_table.clone(), )?; LocalStorage::init_table(context.clone_client_context(), &data_table); + Ok(data_table) + } + + #[try_stream(boxed, ok = RecordBatch, error = ExecutorError)] + pub async fn execute(self, context: Arc) { + let table = self.plan.info.base.table.clone(); + Self::create_table(context, &self.plan.info)?; let array = Arc::new(StringArray::from(vec![format!("CREATE TABLE {}", table)])); let fields = vec![Field::new("success", DataType::Utf8, false)]; yield RecordBatch::try_new( diff --git a/src/execution/volcano_executor/insert.rs b/src/execution/volcano_executor/insert.rs index ed44c3f..c98131f 100644 --- a/src/execution/volcano_executor/insert.rs +++ b/src/execution/volcano_executor/insert.rs @@ -6,6 +6,8 @@ use arrow::record_batch::RecordBatch; use derive_new::new; use futures_async_stream::try_stream; +use super::CreateTable; +use crate::catalog_v2::DataTable; use crate::execution::{ BoxedExecutor, ExecutionContext, ExecutorError, ExpressionExecutor, PhysicalInsert, }; @@ -23,12 +25,13 @@ pub struct Insert { } impl Insert { - #[try_stream(boxed, ok = RecordBatch, error = ExecutorError)] - pub async fn execute(self, context: Arc) { - let table = self.plan.table.storage; + fn insert_into_bound_info( + plan: PhysicalInsert, + ) -> Result<(DataTable, Arc, Vec), ExecutorError> { + let table = plan.table.unwrap().storage; let mut exprs = vec![]; let mut fields = vec![]; - for (table_col_idx, col_insert_idx) in self.plan.column_index_list.iter().enumerate() { + for (table_col_idx, col_insert_idx) in plan.column_index_list.iter().enumerate() { let column = table.column_definitions[table_col_idx].clone(); fields.push(Field::new( column.name.as_str(), @@ -51,6 +54,39 @@ impl Insert { } } let schema = SchemaRef::new(Schema::new_with_metadata(fields.clone(), HashMap::new())); + Ok((table, schema, exprs)) + } + + pub fn create_table_bound_info( + table: DataTable, + ) -> Result<(DataTable, Arc, Vec), ExecutorError> { + let mut exprs = vec![]; + let mut fields = vec![]; + for (idx, column) in table.column_definitions.iter().enumerate() { + fields.push(Field::new( + column.name.as_str(), + column.ty.clone().into(), + true, + )); + let base = BoundExpressionBase::new("".to_string(), column.ty.clone()); + exprs.push(BoundExpression::BoundReferenceExpression( + BoundReferenceExpression::new(base, idx), + )); + } + let schema = SchemaRef::new(Schema::new_with_metadata(fields.clone(), HashMap::new())); + Ok((table, schema, exprs)) + } + + #[try_stream(boxed, ok = RecordBatch, error = ExecutorError)] + pub async fn execute(self, context: Arc) { + let (table, schema, exprs) = if let Some(create_table_info) = self.plan.create_table_info { + // create table as + let table = CreateTable::create_table(context.clone(), &create_table_info)?; + Self::create_table_bound_info(table)? + } else { + // insert into + Self::insert_into_bound_info(self.plan)? + }; #[for_await] for batch in self.child { let batch = batch?; diff --git a/src/execution/volcano_executor/mod.rs b/src/execution/volcano_executor/mod.rs index 705e7b0..c53f2de 100644 --- a/src/execution/volcano_executor/mod.rs +++ b/src/execution/volcano_executor/mod.rs @@ -45,7 +45,7 @@ impl VolcanoExecutor { PhysicalOperator::PhysicalInsert(op) => { let child = op.base.children.first().unwrap().clone(); let child_executor = self.build(child, context.clone()); - Insert::new(op, child_executor).execute(context) + Insert::new(*op, child_executor).execute(context) } PhysicalOperator::PhysicalTableScan(op) => TableScan::new(op).execute(context), PhysicalOperator::PhysicalProjection(op) => { diff --git a/src/planner_v2/binder/query_node/bind_select_node.rs b/src/planner_v2/binder/query_node/bind_select_node.rs index 592c37e..641cf42 100644 --- a/src/planner_v2/binder/query_node/bind_select_node.rs +++ b/src/planner_v2/binder/query_node/bind_select_node.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use derive_new::new; -use sqlparser::ast::{Ident, Query}; +use sqlparser::ast::{Ident, SetExpr}; use super::BoundResultModifier; use crate::planner_v2::{ @@ -34,9 +34,9 @@ pub struct BoundSelectNode { } impl Binder { - pub fn bind_select_node(&mut self, select_node: &Query) -> Result { + pub fn bind_query_body(&mut self, query_body: &SetExpr) -> Result { let projection_index = self.generate_table_index(); - let mut bound_select_node = match &*select_node.body { + let mut bound_select_node = match query_body { sqlparser::ast::SetExpr::Select(select) => self.bind_select_body(select)?, sqlparser::ast::SetExpr::Query(_) => todo!(), sqlparser::ast::SetExpr::SetOperation { .. } => todo!(), diff --git a/src/planner_v2/binder/sqlparser_util.rs b/src/planner_v2/binder/sqlparser_util.rs index a9689dd..7f5f3e3 100644 --- a/src/planner_v2/binder/sqlparser_util.rs +++ b/src/planner_v2/binder/sqlparser_util.rs @@ -206,8 +206,8 @@ impl SqlparserQueryBuilder { } } - pub fn build(self) -> Query { - Query { + pub fn build(self) -> Box { + Box::new(Query { with: None, body: self.body, order_by: vec![], @@ -215,7 +215,7 @@ impl SqlparserQueryBuilder { offset: None, fetch: None, lock: None, - } + }) } } diff --git a/src/planner_v2/binder/statement/bind_create.rs b/src/planner_v2/binder/statement/bind_create.rs index 4f2258c..2147b0e 100644 --- a/src/planner_v2/binder/statement/bind_create.rs +++ b/src/planner_v2/binder/statement/bind_create.rs @@ -1,3 +1,4 @@ +use itertools::Itertools; use sqlparser::ast::Statement; use super::BoundStatement; @@ -9,15 +10,35 @@ use crate::planner_v2::{ use crate::types_v2::LogicalType; impl Binder { - pub fn bind_create_table(&self, stmt: &Statement) -> Result { + pub fn bind_create_table(&mut self, stmt: &Statement) -> Result { match stmt { - Statement::CreateTable { name, columns, .. } => { + Statement::CreateTable { + name, + columns, + query, + .. + } => { let (schema, table) = SqlparserResolver::object_name_to_schema_table(name)?; - let column_definitions = columns - .iter() - .map(SqlparserResolver::column_def_to_column_definition) - .try_collect()?; - let bound_info = BoundCreateTableInfo::new(schema, table, column_definitions); + let (column_definitions, query) = if let Some(query) = query { + // create table columns baesd on query names and types + let select = self.bind_query(query)?; + let cols = select + .names + .into_iter() + .zip_eq(select.types.into_iter()) + .map(|(name, ty)| ColumnDefinition::new(name, ty)) + .collect::>(); + (cols, Some(Box::new(select.plan))) + } else { + // create table columns based on input column_def + let cols = columns + .iter() + .map(SqlparserResolver::column_def_to_column_definition) + .try_collect()?; + (cols, None) + }; + let bound_info = + BoundCreateTableInfo::new(schema, table, column_definitions, query); let plan = LogicalOperator::LogicalCreateTable(LogicalCreateTable::new(bound_info)); Ok(BoundStatement::new( plan, @@ -33,14 +54,29 @@ impl Binder { #[derive(Debug, Clone)] pub struct BoundCreateTableInfo { pub(crate) base: CreateTableInfo, + /// CREATE TABLE from QUERY + pub(crate) query: Option>, } impl BoundCreateTableInfo { - pub fn new(schema: String, table: String, column_definitions: Vec) -> Self { + pub fn new( + schema: String, + table: String, + column_definitions: Vec, + query: Option>, + ) -> Self { let base = CreateInfoBase::new(schema); let create_table_info = CreateTableInfo::new(base, table, column_definitions); Self { base: create_table_info, + query, + } + } + + pub fn new_create_as_query(base: CreateTableInfo, query: LogicalOperator) -> Self { + Self { + base, + query: Some(Box::new(query)), } } } diff --git a/src/planner_v2/binder/statement/bind_explain_table.rs b/src/planner_v2/binder/statement/bind_explain_table.rs index 024967c..f56a7f9 100644 --- a/src/planner_v2/binder/statement/bind_explain_table.rs +++ b/src/planner_v2/binder/statement/bind_explain_table.rs @@ -25,8 +25,7 @@ impl Binder { .selection_col_eq_string("table_name", table_name.as_str()) .build(); let query = SqlparserQueryBuilder::new_from_select(select).build(); - let node = self.bind_select_node(&query)?; - self.create_plan_for_select_node(node) + self.bind_query(&query) } _ => Err(BindError::UnsupportedStmt(format!("{:?}", stmt))), } diff --git a/src/planner_v2/binder/statement/bind_insert.rs b/src/planner_v2/binder/statement/bind_insert.rs index 2246dad..01275de 100644 --- a/src/planner_v2/binder/statement/bind_insert.rs +++ b/src/planner_v2/binder/statement/bind_insert.rs @@ -81,10 +81,9 @@ impl Binder { } } - let select_node = self.bind_select_node(source)?; + let select_node = self.bind_query(source)?; let expected_columns_cnt = named_column_indices.len(); - let select_node = self.create_plan_for_select_node(select_node)?; let inserted_types = select_node.types; let mut plan = select_node.plan; Self::check_insert_column_count_mismatch( diff --git a/src/planner_v2/binder/statement/bind_select.rs b/src/planner_v2/binder/statement/bind_select.rs index 117f927..32f0769 100644 --- a/src/planner_v2/binder/statement/bind_select.rs +++ b/src/planner_v2/binder/statement/bind_select.rs @@ -1,19 +1,21 @@ -use sqlparser::ast::Statement; +use sqlparser::ast::{Query, Statement}; use super::BoundStatement; use crate::planner_v2::{BindError, Binder}; impl Binder { - pub fn bind_select(&mut self, stmt: &Statement) -> Result { + pub fn bind_query_stmt(&mut self, stmt: &Statement) -> Result { match stmt { - Statement::Query(query) => { - let mut node = self.bind_select_node(query)?; - if let Some(limit_modifier) = self.bind_limit_modifier(query)? { - node.modifiers.push(limit_modifier); - } - self.create_plan_for_select_node(node) - } + Statement::Query(query) => self.bind_query(query), _ => Err(BindError::UnsupportedStmt(format!("{:?}", stmt))), } } + + pub fn bind_query(&mut self, query: &Query) -> Result { + let mut node = self.bind_query_body(&query.body)?; + if let Some(limit_modifier) = self.bind_limit_modifier(query)? { + node.modifiers.push(limit_modifier); + } + self.create_plan_for_select_node(node) + } } diff --git a/src/planner_v2/binder/statement/bind_show_tables.rs b/src/planner_v2/binder/statement/bind_show_tables.rs index 22fcdd2..818dab9 100644 --- a/src/planner_v2/binder/statement/bind_show_tables.rs +++ b/src/planner_v2/binder/statement/bind_show_tables.rs @@ -12,8 +12,7 @@ impl Binder { .from_table_function("sqlrs_tables") .build(); let query = SqlparserQueryBuilder::new_from_select(select).build(); - let node = self.bind_select_node(&query)?; - self.create_plan_for_select_node(node) + self.bind_query(&query) } _ => Err(BindError::UnsupportedStmt(format!("{:?}", stmt))), } diff --git a/src/planner_v2/binder/statement/mod.rs b/src/planner_v2/binder/statement/mod.rs index 23c8c08..a55919d 100644 --- a/src/planner_v2/binder/statement/mod.rs +++ b/src/planner_v2/binder/statement/mod.rs @@ -26,7 +26,7 @@ impl Binder { match statement { Statement::CreateTable { .. } => self.bind_create_table(statement), Statement::Insert { .. } => self.bind_insert(statement), - Statement::Query { .. } => self.bind_select(statement), + Statement::Query { .. } => self.bind_query_stmt(statement), Statement::Explain { .. } => self.bind_explain(statement), Statement::ShowTables { .. } => self.bind_show_tables(statement), Statement::ExplainTable { .. } => self.bind_explain_table(statement), diff --git a/tests/slt/create_table.slt b/tests/slt/create_table.slt index 3b5a64f..b93d996 100644 --- a/tests/slt/create_table.slt +++ b/tests/slt/create_table.slt @@ -25,3 +25,14 @@ onlyif sqlrs_v2 statement ok create table t4(v1 int); select v1 from t4; + + +onlyif sqlrs_v2 +statement ok +create table read_csv_table as select * from read_csv('tests/csv/t2.csv'); + +onlyif sqlrs_v2 +query I +select a from read_csv_table limit 1; +---- +10