Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::main_entry::ClientContext;
pub enum FunctionData {
SeqTableScanInputData(Box<SeqTableScanInputData>),
SqlrsTablesData(Box<SqlrsTablesData>),
Placeholder,
SqlrsColumnsData(Box<SqlrsColumnsData>),
}

#[derive(new)]
Expand All @@ -34,6 +34,8 @@ impl BuiltinFunctions {
}

pub fn initialize(&mut self) -> Result<(), FunctionError> {
SqlrsTablesFunc::register_function(self)
SqlrsTablesFunc::register_function(self)?;
SqlrsColumnsFunc::register_function(self)?;
Ok(())
}
}
2 changes: 2 additions & 0 deletions src/function/table/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod seq_table_scan;
mod sqlrs_columns;
mod sqlrs_tables;
mod table_function;
pub use seq_table_scan::*;
pub use sqlrs_columns::*;
pub use sqlrs_tables::*;
pub use table_function::*;
123 changes: 123 additions & 0 deletions src/function/table/sqlrs_columns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::sync::Arc;

use arrow::record_batch::RecordBatch;
use derive_new::new;
use itertools::Itertools;

use super::{TableFunction, TableFunctionBindInput, TableFunctionInput};
use crate::catalog_v2::{Catalog, CatalogEntry, DEFAULT_SCHEMA};
use crate::execution::SchemaUtil;
use crate::function::{BuiltinFunctions, FunctionData, FunctionError};
use crate::main_entry::ClientContext;
use crate::types_v2::{LogicalType, ScalarValue};

pub struct SqlrsColumnsFunc;

#[derive(new, Debug, Clone)]
pub struct SqlrsColumnsData {
pub(crate) entries: Vec<CatalogEntry>,
pub(crate) return_types: Vec<LogicalType>,
pub(crate) return_names: Vec<String>,
pub(crate) current_cursor: usize,
}

impl SqlrsColumnsFunc {
fn generate_sqlrs_tables_names() -> Vec<String> {
vec![
"table_name".to_string(),
"column_names".to_string(),
"column_types".to_string(),
]
}

fn generate_sqlrs_tables_types() -> Vec<LogicalType> {
vec![
LogicalType::Varchar,
LogicalType::Varchar,
LogicalType::Varchar,
]
}

fn bind_func(
context: Arc<ClientContext>,
_input: TableFunctionBindInput,
return_types: &mut Vec<LogicalType>,
return_names: &mut Vec<String>,
) -> Result<Option<FunctionData>, FunctionError> {
let entries = Catalog::scan_entries(context, DEFAULT_SCHEMA.to_string(), &|entry| {
matches!(entry, CatalogEntry::TableCatalogEntry(_))
})?;
let data = SqlrsColumnsData::new(
entries,
Self::generate_sqlrs_tables_types(),
Self::generate_sqlrs_tables_names(),
0,
);
return_types.extend(data.return_types.clone());
return_names.extend(data.return_names.clone());
Ok(Some(FunctionData::SqlrsColumnsData(Box::new(data))))
}

fn tables_func(
_context: Arc<ClientContext>,
input: &mut TableFunctionInput,
) -> Result<Option<RecordBatch>, FunctionError> {
if input.bind_data.is_none() {
return Ok(None);
}

let bind_data = input.bind_data.as_mut().unwrap();
if let FunctionData::SqlrsColumnsData(data) = bind_data {
if data.current_cursor >= data.entries.len() {
return Ok(None);
}

let schema = SchemaUtil::new_schema_ref(&data.return_names, &data.return_types);
let mut table_name = ScalarValue::new_builder(&LogicalType::Varchar)?;
let mut column_names = ScalarValue::new_builder(&LogicalType::Varchar)?;
let mut column_types = ScalarValue::new_builder(&LogicalType::Varchar)?;
for entry in data.entries.iter() {
if let CatalogEntry::TableCatalogEntry(table) = entry {
ScalarValue::append_for_builder(
&ScalarValue::Utf8(Some(table.base.name.clone())),
&mut table_name,
)?;

let names = table.columns.iter().map(|col| col.name.clone()).join(", ");
let names = format!("[{}]", names);
let types = table.columns.iter().map(|col| col.ty.clone()).join(", ");
let types = format!("[{}]", types);
ScalarValue::append_for_builder(
&ScalarValue::Utf8(Some(names)),
&mut column_names,
)?;
ScalarValue::append_for_builder(
&ScalarValue::Utf8(Some(types)),
&mut column_types,
)?;
}
}
let cols = vec![
table_name.finish(),
column_names.finish(),
column_types.finish(),
];
data.current_cursor += data.entries.len();
let batch = RecordBatch::try_new(schema, cols)?;
Ok(Some(batch))
} else {
Err(FunctionError::InternalError(
"unexpected global state type".to_string(),
))
}
}

pub fn register_function(set: &mut BuiltinFunctions) -> Result<(), FunctionError> {
set.add_table_functions(TableFunction::new(
"sqlrs_columns".to_string(),
Some(Self::bind_func),
Self::tables_func,
))?;
Ok(())
}
}
34 changes: 34 additions & 0 deletions src/planner_v2/binder/statement/bind_explain_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use sqlparser::ast::Statement;

use super::BoundStatement;
use crate::planner_v2::{
BindError, Binder, SqlparserQueryBuilder, SqlparserResolver, SqlparserSelectBuilder,
};

impl Binder {
pub fn bind_explain_table(&mut self, stmt: &Statement) -> Result<BoundStatement, BindError> {
match stmt {
Statement::ExplainTable {
describe_alias,
table_name,
..
} => {
if !*describe_alias {
return Err(BindError::UnsupportedStmt(
"Only support describe table statement".to_string(),
));
}
let (_, _table_name) = SqlparserResolver::object_name_to_schema_table(table_name)?;
// FIXME: support filter table_name
let select = SqlparserSelectBuilder::default()
.projection_wildcard()
.from_table_function("sqlrs_columns")
.build();
let query = SqlparserQueryBuilder::new_from_select(select).build();
let node = self.bind_select_node(&query)?;
self.create_plan_for_select_node(node)
}
_ => Err(BindError::UnsupportedStmt(format!("{:?}", stmt))),
}
}
}
5 changes: 2 additions & 3 deletions src/planner_v2/binder/statement/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
mod bind_create;
mod bind_explain;
mod bind_explain_table;
mod bind_insert;
mod bind_select;
mod bind_show_tables;

pub use bind_create::*;
pub use bind_insert::*;
pub use bind_select::*;
pub use bind_show_tables::*;
use derive_new::new;
use sqlparser::ast::Statement;

Expand All @@ -30,6 +28,7 @@ impl Binder {
Statement::Query { .. } => self.bind_select(statement),
Statement::Explain { .. } => self.bind_explain(statement),
Statement::ShowTables { .. } => self.bind_show_tables(statement),
Statement::ExplainTable { .. } => self.bind_explain_table(statement),
_ => Err(BindError::UnsupportedStmt(format!("{:?}", statement))),
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/types_v2/types.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use strum_macros::AsRefStr;

use super::TypeError;

/// Sqlrs type conversion:
/// sqlparser::ast::DataType -> LogicalType -> arrow::datatypes::DataType
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, AsRefStr)]
pub enum LogicalType {
Invalid,
SqlNull,
Expand Down Expand Up @@ -231,3 +233,9 @@ impl From<LogicalType> for arrow::datatypes::DataType {
}
}
}

impl std::fmt::Display for LogicalType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_ref())
}
}
2 changes: 1 addition & 1 deletion src/util/tree_render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl TreeRender {
input.bind_table.storage.info.table
)
}
FunctionData::Placeholder => todo!(),
FunctionData::SqlrsColumnsData(_) => "sqlrs_columns".to_string(),
FunctionData::SqlrsTablesData(_) => "sqlrs_tables".to_string(),
},
None => "None".to_string(),
Expand Down
6 changes: 6 additions & 0 deletions tests/slt/table_function.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ query III
select schema_name, schema_oid, table_name from sqlrs_tables();
----
main 1 t1

onlyif sqlrs_v2
query III
select * from sqlrs_columns();
----
t1 [v1, v2, v3] [Integer, Integer, Integer]