Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 53 additions & 1 deletion src/catalog_v2/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::Arc;

use super::entry::{CatalogEntry, DataTable};
use super::{CatalogError, CatalogSet, TableCatalogEntry};
use super::{CatalogError, CatalogSet, TableCatalogEntry, TableFunctionCatalogEntry};
use crate::common::CreateTableFunctionInfo;
use crate::main_entry::ClientContext;

/// The Catalog object represents the catalog of the database.
Expand Down Expand Up @@ -57,4 +58,55 @@ impl Catalog {
}
Err(CatalogError::CatalogEntryTypeNotMatch)
}

pub fn create_table_function(
client_context: Arc<ClientContext>,
info: CreateTableFunctionInfo,
) -> Result<(), CatalogError> {
let mut catalog = match client_context.db.catalog.try_write() {
Ok(c) => c,
Err(_) => return Err(CatalogError::CatalogLockedError),
};
if let CatalogEntry::SchemaCatalogEntry(mut entry) =
catalog.schemas.get_entry(info.base.schema.clone())?
{
catalog.catalog_version += 1;
let schema = info.base.schema.clone();
entry.create_table_function(catalog.catalog_version, info)?;
catalog
.schemas
.replace_entry(schema, CatalogEntry::SchemaCatalogEntry(entry))?;
return Ok(());
}
Err(CatalogError::CatalogEntryTypeNotMatch)
}

pub fn scan_entries<F>(
client_context: Arc<ClientContext>,
callback: &F,
) -> Result<Vec<CatalogEntry>, CatalogError>
where
F: Fn(&CatalogEntry) -> bool,
{
let catalog = match client_context.db.catalog.try_read() {
Ok(c) => c,
Err(_) => return Err(CatalogError::CatalogLockedError),
};
Ok(catalog.schemas.scan_entries(callback))
}

pub fn get_table_function(
client_context: Arc<ClientContext>,
schema: String,
table_function: String,
) -> Result<TableFunctionCatalogEntry, CatalogError> {
let catalog = match client_context.db.catalog.try_read() {
Ok(c) => c,
Err(_) => return Err(CatalogError::CatalogLockedError),
};
if let CatalogEntry::SchemaCatalogEntry(entry) = catalog.schemas.get_entry(schema)? {
return entry.get_table_function(table_function);
}
Err(CatalogError::CatalogEntryTypeNotMatch)
}
}
13 changes: 13 additions & 0 deletions src/catalog_v2/catalog_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,17 @@ impl CatalogSet {
}
Err(CatalogError::CatalogEntryNotExists(name))
}

pub fn scan_entries<F>(&self, callback: &F) -> Vec<CatalogEntry>
where
F: Fn(&CatalogEntry) -> bool,
{
let mut result = vec![];
for (_, entry) in self.entries.iter() {
if callback(entry) {
result.push(entry.clone());
}
}
result
}
}
7 changes: 5 additions & 2 deletions src/catalog_v2/entry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
mod schema_catalog_entry;
mod table_catalog_entry;
mod table_function_catalog_entry;

use derive_new::new;
pub use schema_catalog_entry::*;
pub use table_catalog_entry::*;
pub use table_function_catalog_entry::*;

#[derive(Clone, Debug)]
pub enum CatalogEntry {
SchemaCatalogEntry(SchemaCatalogEntry),
TableCatalogEntry(TableCatalogEntry),
TableFunctionCatalogEntry(TableFunctionCatalogEntry),
}

impl CatalogEntry {
Expand All @@ -21,7 +24,7 @@ impl CatalogEntry {
#[derive(new, Clone, Debug)]
pub struct CatalogEntryBase {
/// The object identifier of the entry
oid: usize,
pub(crate) oid: usize,
/// The name of the entry
name: String,
pub(crate) name: String,
}
37 changes: 34 additions & 3 deletions src/catalog_v2/entry/schema_catalog_entry.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use super::table_catalog_entry::{DataTable, TableCatalogEntry};
use super::{CatalogEntry, CatalogEntryBase};
use super::{CatalogEntry, CatalogEntryBase, TableFunctionCatalogEntry};
use crate::catalog_v2::{CatalogError, CatalogSet};
use crate::common::CreateTableFunctionInfo;

#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct SchemaCatalogEntry {
base: CatalogEntryBase,
tables: CatalogSet,
functions: CatalogSet,
}

impl SchemaCatalogEntry {
pub fn new(oid: usize, schema: String) -> Self {
Self {
base: CatalogEntryBase::new(oid, schema),
tables: CatalogSet::default(),
functions: CatalogSet::default(),
}
}

Expand All @@ -23,8 +26,12 @@ impl SchemaCatalogEntry {
table: String,
storage: DataTable,
) -> Result<(), CatalogError> {
let entry =
CatalogEntry::TableCatalogEntry(TableCatalogEntry::new(oid, table.clone(), storage));
let entry = CatalogEntry::TableCatalogEntry(TableCatalogEntry::new(
oid,
table.clone(),
self.base.clone(),
storage,
));
self.tables.create_entry(table, entry)?;
Ok(())
}
Expand All @@ -35,4 +42,28 @@ impl SchemaCatalogEntry {
_ => Err(CatalogError::CatalogEntryNotExists(table)),
}
}

pub fn create_table_function(
&mut self,
oid: usize,
info: CreateTableFunctionInfo,
) -> Result<(), CatalogError> {
let entry = TableFunctionCatalogEntry::new(
CatalogEntryBase::new(oid, info.name.clone()),
info.functions,
);
let entry = CatalogEntry::TableFunctionCatalogEntry(entry);
self.functions.create_entry(info.name, entry)?;
Ok(())
}

pub fn get_table_function(
&self,
table_function: String,
) -> Result<TableFunctionCatalogEntry, CatalogError> {
match self.functions.get_entry(table_function.clone())? {
CatalogEntry::TableFunctionCatalogEntry(e) => Ok(e),
_ => Err(CatalogError::CatalogEntryNotExists(table_function)),
}
}
}
9 changes: 8 additions & 1 deletion src/catalog_v2/entry/table_catalog_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::types_v2::LogicalType;
#[derive(Clone, Debug)]
pub struct TableCatalogEntry {
pub(crate) base: CatalogEntryBase,
pub(crate) schema_base: CatalogEntryBase,
pub(crate) storage: DataTable,
/// A list of columns that are part of this table
pub(crate) columns: Vec<ColumnDefinition>,
Expand All @@ -17,7 +18,12 @@ pub struct TableCatalogEntry {
}

impl TableCatalogEntry {
pub fn new(oid: usize, table: String, storage: DataTable) -> Self {
pub fn new(
oid: usize,
table: String,
schema_base: CatalogEntryBase,
storage: DataTable,
) -> Self {
let mut name_map = HashMap::new();
let mut columns = vec![];
storage
Expand All @@ -30,6 +36,7 @@ impl TableCatalogEntry {
});
Self {
base: CatalogEntryBase::new(oid, table),
schema_base,
storage,
columns,
name_map,
Expand Down
12 changes: 12 additions & 0 deletions src/catalog_v2/entry/table_function_catalog_entry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use derive_new::new;

use super::CatalogEntryBase;
use crate::function::TableFunction;

#[derive(new, Clone, Debug)]
pub struct TableFunctionCatalogEntry {
#[allow(dead_code)]
pub(crate) base: CatalogEntryBase,
#[allow(dead_code)]
pub(crate) functions: Vec<TableFunction>,
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use derive_new::new;

use crate::catalog_v2::ColumnDefinition;
use crate::function::TableFunction;

#[derive(new, Debug, Clone)]
pub struct CreateInfoBase {
pub(crate) schema: String,
}

#[derive(new, Debug, Clone)]
pub struct CreateTableInfo {
Expand All @@ -11,7 +17,11 @@ pub struct CreateTableInfo {
pub(crate) columns: Vec<ColumnDefinition>,
}

#[derive(new, Debug, Clone)]
pub struct CreateInfoBase {
pub(crate) schema: String,
#[derive(new)]
pub struct CreateTableFunctionInfo {
pub(crate) base: CreateInfoBase,
/// Function name
pub(crate) name: String,
/// Functions with different arguments
pub(crate) functions: Vec<TableFunction>,
}
3 changes: 3 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod create_info;

pub use create_info::*;
7 changes: 7 additions & 0 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub use util::*;
pub use volcano_executor::*;

use crate::catalog_v2::CatalogError;
use crate::function::FunctionError;
use crate::main_entry::ClientContext;
use crate::types_v2::TypeError;

Expand Down Expand Up @@ -52,6 +53,12 @@ pub enum ExecutorError {
#[from]
TypeError,
),
#[error("function error: {0}")]
FunctionError(
#[source]
#[from]
FunctionError,
),
#[error("Executor internal error: {0}")]
InternalError(String),
}
8 changes: 5 additions & 3 deletions src/execution/physical_plan/physical_table_scan.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use derive_new::new;

use super::{PhysicalOperator, PhysicalOperatorBase};
use crate::catalog_v2::TableCatalogEntry;
use crate::execution::PhysicalPlanGenerator;
use crate::function::{FunctionData, TableFunction};
use crate::planner_v2::LogicalGet;
use crate::types_v2::LogicalType;

#[derive(new, Clone)]
pub struct PhysicalTableScan {
pub(crate) base: PhysicalOperatorBase,
pub(crate) bind_table: TableCatalogEntry,
pub(crate) function: TableFunction,
pub(crate) bind_data: FunctionData,
/// The types of ALL columns that can be returned by the table function
pub(crate) returned_types: Vec<LogicalType>,
/// The names of ALL columns that can be returned by the table function
Expand All @@ -19,7 +20,8 @@ pub struct PhysicalTableScan {
impl PhysicalPlanGenerator {
pub(crate) fn create_physical_table_scan(&self, op: LogicalGet) -> PhysicalOperator {
let base = PhysicalOperatorBase::new(vec![], op.base.types);
let plan = PhysicalTableScan::new(base, op.bind_table, op.returned_types, op.names);
let plan =
PhysicalTableScan::new(base, op.function, op.bind_data, op.returned_types, op.names);
PhysicalOperator::PhysicalTableScan(plan)
}
}
24 changes: 19 additions & 5 deletions src/execution/volcano_executor/table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use derive_new::new;
use futures_async_stream::try_stream;

use crate::execution::{ExecutionContext, ExecutorError, PhysicalTableScan, SchemaUtil};
use crate::storage_v2::LocalStorage;
use crate::function::{
GlobalTableFunctionState, SeqTableScanInitInput, TableFunctionInitInput, TableFunctionInput,
};

#[derive(new)]
pub struct TableScan {
Expand All @@ -17,10 +19,22 @@ impl TableScan {
pub async fn execute(self, context: Arc<ExecutionContext>) {
let schema = SchemaUtil::new_schema_ref(&self.plan.names, &self.plan.returned_types);

let table = self.plan.bind_table;
let mut local_storage_reader = LocalStorage::create_reader(&table.storage);
let client_context = context.clone_client_context();
while let Some(batch) = local_storage_reader.next_batch(client_context.clone()) {
let bind_data = self.plan.bind_data;

let table_scan_func = self.plan.function.function;
let global_state = if let Some(init_global_func) = self.plan.function.init_global {
let seq_table_scan_init_input = TableFunctionInitInput::SeqTableScanInitInput(
Box::new(SeqTableScanInitInput::new(bind_data.clone())),
);
init_global_func(context.clone_client_context(), seq_table_scan_init_input)?
} else {
GlobalTableFunctionState::None
};

let mut tabel_scan_input = TableFunctionInput::new(bind_data, global_state);
while let Some(batch) =
table_scan_func(context.clone_client_context(), &mut tabel_scan_input)?
{
let columns = batch.columns().to_vec();
yield RecordBatch::try_new(schema.clone(), columns)?
}
Expand Down
28 changes: 28 additions & 0 deletions src/function/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use arrow::error::ArrowError;

use crate::catalog_v2::CatalogError;
use crate::types_v2::TypeError;

#[derive(thiserror::Error, Debug)]
pub enum FunctionError {
#[error("catalog error: {0}")]
CatalogError(
#[from]
#[source]
CatalogError,
),
#[error("type error: {0}")]
TypeError(
#[from]
#[source]
TypeError,
),
#[error("arrow error: {0}")]
ArrowError(
#[from]
#[source]
ArrowError,
),
#[error("Internal error: {0}")]
InternalError(String),
}
11 changes: 11 additions & 0 deletions src/function/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
mod errors;
mod table;

pub use errors::*;
pub use table::*;

#[derive(Debug, Clone)]
pub enum FunctionData {
SeqTableScanInputData(Box<SeqTableScanInputData>),
None,
}
4 changes: 4 additions & 0 deletions src/function/table/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod seq_table_scan;
mod table_function;
pub use seq_table_scan::*;
pub use table_function::*;
Loading