Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/catalog_v2/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl Catalog {

pub fn scan_entries<F>(
client_context: Arc<ClientContext>,
schema: String,
callback: &F,
) -> Result<Vec<CatalogEntry>, CatalogError>
where
Expand All @@ -92,7 +93,10 @@ impl Catalog {
Ok(c) => c,
Err(_) => return Err(CatalogError::CatalogLockedError),
};
Ok(catalog.schemas.scan_entries(callback))
if let CatalogEntry::SchemaCatalogEntry(entry) = catalog.schemas.get_entry(schema)? {
return Ok(entry.scan_entries(callback));
}
Err(CatalogError::CatalogEntryTypeNotMatch)
}

pub fn get_table_function(
Expand Down
10 changes: 10 additions & 0 deletions src/catalog_v2/entry/schema_catalog_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,14 @@ impl SchemaCatalogEntry {
_ => Err(CatalogError::CatalogEntryNotExists(table_function)),
}
}

pub fn scan_entries<F>(&self, callback: &F) -> Vec<CatalogEntry>
where
F: Fn(&CatalogEntry) -> bool,
{
let mut result = vec![];
result.extend(self.tables.scan_entries(callback));
result.extend(self.functions.scan_entries(callback));
result
}
}
2 changes: 1 addition & 1 deletion src/execution/physical_plan/physical_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::types_v2::LogicalType;
pub struct PhysicalTableScan {
pub(crate) base: PhysicalOperatorBase,
pub(crate) function: TableFunction,
pub(crate) bind_data: FunctionData,
pub(crate) bind_data: Option<FunctionData>,
/// The types of ALL columns that can be returned by the table function
pub(crate) returned_types: Vec<LogicalType>,
/// The names of ALL columns that can be returned by the table function
Expand Down
20 changes: 6 additions & 14 deletions src/execution/volcano_executor/table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use derive_new::new;
use futures_async_stream::try_stream;

use crate::execution::{ExecutionContext, ExecutorError, PhysicalTableScan, SchemaUtil};
use crate::function::{
GlobalTableFunctionState, SeqTableScanInitInput, TableFunctionInitInput, TableFunctionInput,
};
use crate::function::TableFunctionInput;

#[derive(new)]
pub struct TableScan {
Expand All @@ -21,22 +19,16 @@ impl TableScan {

let bind_data = self.plan.bind_data;

let table_scan_func = self.plan.function.function;
let global_state = if let Some(init_global_func) = self.plan.function.init_global {
let seq_table_scan_init_input = TableFunctionInitInput::SeqTableScanInitInput(
Box::new(SeqTableScanInitInput::new(bind_data.clone())),
);
init_global_func(context.clone_client_context(), seq_table_scan_init_input)?
} else {
GlobalTableFunctionState::None
};
let function = self.plan.function;
let table_scan_func = function.function;
let mut tabel_scan_input = TableFunctionInput::new(bind_data);

let mut tabel_scan_input = TableFunctionInput::new(bind_data, global_state);
while let Some(batch) =
table_scan_func(context.clone_client_context(), &mut tabel_scan_input)?
{
let columns = batch.columns().to_vec();
yield RecordBatch::try_new(schema.clone(), columns)?
let try_new = RecordBatch::try_new(schema.clone(), columns)?;
yield try_new
}
}
}
30 changes: 29 additions & 1 deletion src/function/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,39 @@
mod errors;
mod table;

use std::sync::Arc;

use derive_new::new;
pub use errors::*;
pub use table::*;

use crate::catalog_v2::{Catalog, DEFAULT_SCHEMA};
use crate::common::{CreateInfoBase, CreateTableFunctionInfo};
use crate::main_entry::ClientContext;

#[derive(Debug, Clone)]
pub enum FunctionData {
SeqTableScanInputData(Box<SeqTableScanInputData>),
None,
SqlrsTablesData(Box<SqlrsTablesData>),
Placeholder,
}

#[derive(new)]
pub struct BuiltinFunctions {
pub(crate) context: Arc<ClientContext>,
}

impl BuiltinFunctions {
pub fn add_table_functions(&mut self, function: TableFunction) -> Result<(), FunctionError> {
let info = CreateTableFunctionInfo::new(
CreateInfoBase::new(DEFAULT_SCHEMA.to_string()),
function.name.clone(),
vec![function],
);
Ok(Catalog::create_table_function(self.context.clone(), info)?)
}

pub fn initialize(&mut self) -> Result<(), FunctionError> {
SqlrsTablesFunc::register_function(self)
}
}
2 changes: 2 additions & 0 deletions src/function/table/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
mod seq_table_scan;
mod sqlrs_tables;
mod table_function;
pub use seq_table_scan::*;
pub use sqlrs_tables::*;
pub use table_function::*;
43 changes: 19 additions & 24 deletions src/function/table/seq_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::catalog_v2::TableCatalogEntry;
use crate::function::{FunctionData, FunctionError};
use crate::main_entry::ClientContext;
use crate::storage_v2::{LocalStorage, LocalStorageReader};
use crate::types_v2::LogicalType;

/// The table scan function represents a sequential scan over one of base tables.
pub struct SeqTableScan;
Expand All @@ -18,23 +19,15 @@ pub struct SeqTableScanInputData {
pub(crate) local_storage_reader: LocalStorageReader,
}

#[derive(new)]
pub struct SeqTableScanBindInput {
pub(crate) bind_table: TableCatalogEntry,
}

#[derive(new)]
pub struct SeqTableScanInitInput {
#[allow(dead_code)]
pub(crate) bind_data: FunctionData,
}

impl SeqTableScan {
fn seq_table_scan_bind_func(
#[allow(clippy::ptr_arg)]
fn bind_func(
_context: Arc<ClientContext>,
input: TableFunctionBindInput,
_return_types: &mut Vec<LogicalType>,
_return_names: &mut Vec<String>,
) -> Result<Option<FunctionData>, FunctionError> {
if let TableFunctionBindInput::SeqTableScanBindInput(bind_input) = input {
let table = bind_input.bind_table;
if let Some(table) = input.bind_table {
let res = FunctionData::SeqTableScanInputData(Box::new(SeqTableScanInputData::new(
table.clone(),
LocalStorage::create_reader(&table.storage),
Expand All @@ -47,26 +40,28 @@ impl SeqTableScan {
}
}

fn seq_table_scan_func(
fn scan_func(
context: Arc<ClientContext>,
input: &mut TableFunctionInput,
) -> Result<Option<RecordBatch>, FunctionError> {
if let FunctionData::SeqTableScanInputData(data) = &mut input.bind_data {
let batch = data.local_storage_reader.next_batch(context);
Ok(batch)
if let Some(bind_data) = &mut input.bind_data {
if let FunctionData::SeqTableScanInputData(data) = bind_data {
Ok(data.local_storage_reader.next_batch(context))
} else {
Err(FunctionError::InternalError(
"unexpected bind data type".to_string(),
))
}
} else {
Err(FunctionError::InternalError(
"unexpected bind data type".to_string(),
))
Ok(None)
}
}

pub fn get_function() -> TableFunction {
TableFunction::new(
"seq_table_scan".to_string(),
Some(Self::seq_table_scan_bind_func),
None,
Self::seq_table_scan_func,
Some(Self::bind_func),
Self::scan_func,
)
}
}
125 changes: 125 additions & 0 deletions src/function/table/sqlrs_tables.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
use std::sync::Arc;

use arrow::record_batch::RecordBatch;
use derive_new::new;

use super::{TableFunction, TableFunctionBindInput, TableFunctionInput};
use crate::catalog_v2::{Catalog, CatalogEntry, DEFAULT_SCHEMA};
use crate::execution::SchemaUtil;
use crate::function::{BuiltinFunctions, FunctionData, FunctionError};
use crate::main_entry::ClientContext;
use crate::types_v2::{LogicalType, ScalarValue};

pub struct SqlrsTablesFunc;

#[derive(new, Debug, Clone)]
pub struct SqlrsTablesData {
pub(crate) entries: Vec<CatalogEntry>,
pub(crate) return_types: Vec<LogicalType>,
pub(crate) return_names: Vec<String>,
pub(crate) current_cursor: usize,
}

impl SqlrsTablesFunc {
fn generate_sqlrs_tables_names() -> Vec<String> {
vec![
"schema_name".to_string(),
"schema_oid".to_string(),
"table_name".to_string(),
"table_oid".to_string(),
]
}

fn generate_sqlrs_tables_types() -> Vec<LogicalType> {
vec![
LogicalType::Varchar,
LogicalType::Integer,
LogicalType::Varchar,
LogicalType::Integer,
]
}

fn bind_func(
context: Arc<ClientContext>,
_input: TableFunctionBindInput,
return_types: &mut Vec<LogicalType>,
return_names: &mut Vec<String>,
) -> Result<Option<FunctionData>, FunctionError> {
let entries = Catalog::scan_entries(context, DEFAULT_SCHEMA.to_string(), &|entry| {
matches!(entry, CatalogEntry::TableCatalogEntry(_))
})?;
let data = SqlrsTablesData::new(
entries,
Self::generate_sqlrs_tables_types(),
Self::generate_sqlrs_tables_names(),
0,
);
return_types.extend(data.return_types.clone());
return_names.extend(data.return_names.clone());
Ok(Some(FunctionData::SqlrsTablesData(Box::new(data))))
}

fn tables_func(
_context: Arc<ClientContext>,
input: &mut TableFunctionInput,
) -> Result<Option<RecordBatch>, FunctionError> {
if input.bind_data.is_none() {
return Ok(None);
}

let bind_data = input.bind_data.as_mut().unwrap();
if let FunctionData::SqlrsTablesData(data) = bind_data {
if data.current_cursor >= data.entries.len() {
return Ok(None);
}

let schema = SchemaUtil::new_schema_ref(&data.return_names, &data.return_types);
let mut schema_names = ScalarValue::new_builder(&LogicalType::Varchar)?;
let mut schema_oids = ScalarValue::new_builder(&LogicalType::Integer)?;
let mut table_names = ScalarValue::new_builder(&LogicalType::Varchar)?;
let mut table_oids = ScalarValue::new_builder(&LogicalType::Integer)?;
for entry in data.entries.iter() {
if let CatalogEntry::TableCatalogEntry(table) = entry {
ScalarValue::append_for_builder(
&ScalarValue::Utf8(Some(table.schema_base.name.clone())),
&mut schema_names,
)?;
ScalarValue::append_for_builder(
&ScalarValue::Int32(Some(table.schema_base.oid as i32)),
&mut schema_oids,
)?;
ScalarValue::append_for_builder(
&ScalarValue::Utf8(Some(table.base.name.clone())),
&mut table_names,
)?;
ScalarValue::append_for_builder(
&ScalarValue::Int32(Some(table.base.oid as i32)),
&mut table_oids,
)?;
}
}
let cols = vec![
schema_names.finish(),
schema_oids.finish(),
table_names.finish(),
table_oids.finish(),
];
data.current_cursor += data.entries.len();
let batch = RecordBatch::try_new(schema, cols)?;
Ok(Some(batch))
} else {
Err(FunctionError::InternalError(
"unexpected global state type".to_string(),
))
}
}

pub fn register_function(set: &mut BuiltinFunctions) -> Result<(), FunctionError> {
set.add_table_functions(TableFunction::new(
"sqlrs_tables".to_string(),
Some(Self::bind_func),
Self::tables_func,
))?;
Ok(())
}
}
Loading