Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ lazy_static = "1"
strum = "0.24"
strum_macros = "0.24"
ordered-float = "3.0"
derive-new = "0.5.9"

[dev-dependencies]
test-case = "2"
Expand Down
60 changes: 60 additions & 0 deletions src/catalog_v2/catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::sync::Arc;

use super::entry::{CatalogEntry, DataTable};
use super::{CatalogError, CatalogSet, TableCatalogEntry};
use crate::main_entry::ClientContext;

/// The Catalog object represents the catalog of the database.
#[derive(Clone, Debug, Default)]
pub struct Catalog {
/// The catalog set holding the schemas
schemas: CatalogSet,
/// The catalog version, incremented whenever anything changes in the catalog
catalog_version: usize,
}

impl Catalog {
pub fn create_schema(&mut self, name: String) -> Result<(), CatalogError> {
self.catalog_version += 1;
let entry = CatalogEntry::default_schema_catalog_entry(self.catalog_version, name.clone());
self.schemas.create_entry(name, entry)
}

pub fn create_table(
client_context: Arc<ClientContext>,
schema: String,
table: String,
data_table: DataTable,
) -> Result<(), CatalogError> {
let mut catalog = match client_context.db.catalog.try_write() {
Ok(c) => c,
Err(_) => return Err(CatalogError::CatalogLockedError),
};
if let CatalogEntry::SchemaCatalogEntry(mut entry) =
catalog.schemas.get_entry(schema.clone())?
{
catalog.catalog_version += 1;
entry.create_table(catalog.catalog_version, table, data_table)?;
catalog
.schemas
.replace_entry(schema, CatalogEntry::SchemaCatalogEntry(entry))?;
return Ok(());
}
Err(CatalogError::CatalogEntryTypeNotMatch)
}

pub fn get_table(
client_context: Arc<ClientContext>,
schema: String,
table: String,
) -> Result<TableCatalogEntry, CatalogError> {
let catalog = match client_context.db.catalog.try_read() {
Ok(c) => c,
Err(_) => return Err(CatalogError::CatalogLockedError),
};
if let CatalogEntry::SchemaCatalogEntry(entry) = catalog.schemas.get_entry(schema)? {
return entry.get_table(table);
}
Err(CatalogError::CatalogEntryTypeNotMatch)
}
}
49 changes: 49 additions & 0 deletions src/catalog_v2/catalog_set.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::collections::HashMap;

use super::{CatalogEntry, CatalogError};

/// The Catalog Set stores (key, value) map of a set of CatalogEntries
#[derive(Clone, Debug, Default)]
pub struct CatalogSet {
/// The set of catalog entries, entry index to entry
entries: HashMap<usize, CatalogEntry>,
/// Mapping of string to catalog entry index
mapping: HashMap<String, usize>,
/// The current catalog entry index
current_entry: usize,
}

impl CatalogSet {
pub fn create_entry(&mut self, name: String, entry: CatalogEntry) -> Result<(), CatalogError> {
if self.mapping.get(&name).is_some() {
return Err(CatalogError::CatalogEntryExists(name));
}
self.current_entry += 1;
self.entries.insert(self.current_entry, entry);
self.mapping.insert(name, self.current_entry);
Ok(())
}

pub fn get_entry(&self, name: String) -> Result<CatalogEntry, CatalogError> {
if let Some(index) = self.mapping.get(&name) {
if let Some(entry) = self.entries.get(index) {
return Ok(entry.clone());
}
}
Err(CatalogError::CatalogEntryNotExists(name))
}

pub fn replace_entry(
&mut self,
name: String,
new_entry: CatalogEntry,
) -> Result<(), CatalogError> {
if let Some(old_entry_index) = self.mapping.get(&name) {
if self.entries.get(old_entry_index).is_some() {
self.entries.insert(*old_entry_index, new_entry);
return Ok(());
}
}
Err(CatalogError::CatalogEntryNotExists(name))
}
}
1 change: 1 addition & 0 deletions src/catalog_v2/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub static DEFAULT_SCHEMA: &str = "main";
27 changes: 27 additions & 0 deletions src/catalog_v2/entry/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
mod schema_catalog_entry;
mod table_catalog_entry;

use derive_new::new;
pub use schema_catalog_entry::*;
pub use table_catalog_entry::*;

#[derive(Clone, Debug)]
pub enum CatalogEntry {
SchemaCatalogEntry(SchemaCatalogEntry),
TableCatalogEntry(TableCatalogEntry),
}

impl CatalogEntry {
pub fn default_schema_catalog_entry(oid: usize, schema: String) -> Self {
Self::SchemaCatalogEntry(SchemaCatalogEntry::new(oid, schema))
}
}

#[allow(dead_code)]
#[derive(new, Clone, Debug)]
pub struct CatalogEntryBase {
/// The object identifier of the entry
oid: usize,
/// The name of the entry
name: String,
}
38 changes: 38 additions & 0 deletions src/catalog_v2/entry/schema_catalog_entry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use super::table_catalog_entry::{DataTable, TableCatalogEntry};
use super::{CatalogEntry, CatalogEntryBase};
use crate::catalog_v2::{CatalogError, CatalogSet};

#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct SchemaCatalogEntry {
base: CatalogEntryBase,
tables: CatalogSet,
}

impl SchemaCatalogEntry {
pub fn new(oid: usize, schema: String) -> Self {
Self {
base: CatalogEntryBase::new(oid, schema),
tables: CatalogSet::default(),
}
}

pub fn create_table(
&mut self,
oid: usize,
table: String,
storage: DataTable,
) -> Result<(), CatalogError> {
let entry =
CatalogEntry::TableCatalogEntry(TableCatalogEntry::new(oid, table.clone(), storage));
self.tables.create_entry(table, entry)?;
Ok(())
}

pub fn get_table(&self, table: String) -> Result<TableCatalogEntry, CatalogError> {
match self.tables.get_entry(table.clone())? {
CatalogEntry::TableCatalogEntry(e) => Ok(e),
_ => Err(CatalogError::CatalogEntryNotExists(table)),
}
}
}
64 changes: 64 additions & 0 deletions src/catalog_v2/entry/table_catalog_entry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::collections::HashMap;

use derive_new::new;

use super::CatalogEntryBase;
use crate::types_v2::LogicalType;

#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct TableCatalogEntry {
pub(crate) base: CatalogEntryBase,
pub(crate) storage: DataTable,
/// A list of columns that are part of this table
pub(crate) columns: Vec<ColumnDefinition>,
/// A map of column name to column index
pub(crate) name_map: HashMap<String, usize>,
}

impl TableCatalogEntry {
pub fn new(oid: usize, table: String, storage: DataTable) -> Self {
let mut name_map = HashMap::new();
let mut columns = vec![];
storage
.column_definitions
.iter()
.enumerate()
.for_each(|(idx, col)| {
columns.push(col.clone());
name_map.insert(col.name.clone(), idx);
});
Self {
base: CatalogEntryBase::new(oid, table),
storage,
columns,
name_map,
}
}
}

/// DataTable represents a physical table on disk
#[derive(new, Clone, Debug, PartialEq, Eq, Hash)]
pub struct DataTable {
/// The table info
pub(crate) info: DataTableInfo,
/// The set of physical columns stored by this DataTable
pub(crate) column_definitions: Vec<ColumnDefinition>,
}

#[derive(new, Clone, Debug, PartialEq, Eq, Hash)]
pub struct DataTableInfo {
/// schema of the table
pub(crate) schema: String,
/// name of the table
pub(crate) table: String,
}

/// A column of a table
#[derive(new, Clone, Debug, PartialEq, Eq, Hash)]
pub struct ColumnDefinition {
/// The name of the entry
pub(crate) name: String,
/// The type of the column
pub(crate) ty: LogicalType,
}
11 changes: 11 additions & 0 deletions src/catalog_v2/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#[derive(thiserror::Error, Debug)]
pub enum CatalogError {
#[error("CatalogEntry: {0} already exists")]
CatalogEntryExists(String),
#[error("CatalogEntry: {0} not exists")]
CatalogEntryNotExists(String),
#[error("CatalogEntry type not match")]
CatalogEntryTypeNotMatch,
#[error("Catalog locked, please retry")]
CatalogLockedError,
}
11 changes: 11 additions & 0 deletions src/catalog_v2/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
mod catalog;
mod catalog_set;
mod constants;
mod entry;
mod errors;

pub use catalog::*;
pub use catalog_set::*;
pub use constants::*;
pub use entry::*;
pub use errors::*;
21 changes: 19 additions & 2 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
use std::fs::File;
use std::sync::Arc;

use anyhow::{Error, Result};
use rustyline::error::ReadlineError;
use rustyline::Editor;

use crate::main_entry::ClientContext;
use crate::util::pretty_batches;
use crate::Database;

pub async fn interactive(db: Database) -> Result<()> {
pub async fn interactive(db: Database, client_context: Arc<ClientContext>) -> Result<()> {
let mut rl = Editor::<()>::new()?;
load_history(&mut rl);

let mut enable_v2 = false;

loop {
let read_sql = read_sql(&mut rl);
match read_sql {
Expand All @@ -19,7 +23,20 @@ pub async fn interactive(db: Database) -> Result<()> {
rl.add_history_entry(sql.as_str());
let start_time = std::time::Instant::now();

run_sql(&db, sql).await?;
if sql.starts_with("enable_v2") {
enable_v2 = true;
println!("---- enable sqlrs v2 ! ----");
continue;
}

if enable_v2 {
match client_context.query(sql).await {
Ok(_) => {}
Err(err) => println!("Run Error: {}", err),
}
} else {
run_sql(&db, sql).await?;
}

let end_time = std::time::Instant::now();
let time_consumed = end_time.duration_since(start_time);
Expand Down
39 changes: 39 additions & 0 deletions src/execution/column_binding_resolver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use crate::planner_v2::{
BoundColumnRefExpression, BoundExpression, BoundExpressionBase, BoundReferenceExpression,
ColumnBinding, LogicalOperator, LogicalOperatorVisitor,
};

#[derive(Default)]
pub struct ColumnBindingResolver {
bindings: Vec<ColumnBinding>,
}

impl LogicalOperatorVisitor for ColumnBindingResolver {
fn visit_operator(&mut self, op: &mut LogicalOperator) {
{
self.visit_operator_children(op);
self.visit_operator_expressions(op);
self.bindings = op.get_column_bindings();
}
}

fn visit_replace_column_ref(&self, expr: &BoundColumnRefExpression) -> Option<BoundExpression> {
assert!(expr.depth == 0);
// check the current set of column bindings to see which index corresponds to the column
// reference
if let Some(idx) = self.bindings.iter().position(|e| expr.binding == *e) {
let expr = BoundReferenceExpression::new(
BoundExpressionBase::new(expr.base.alias.clone(), expr.base.return_type.clone()),
idx,
);
return Some(BoundExpression::BoundReferenceExpression(expr));
}

// could not bind the column reference, this should never happen and indicates a bug in the
// code generate an error message
panic!(
"Failed to bind column reference {} [{}.{}] (bindings: {:?}), ",
expr.base.alias, expr.binding.table_idx, expr.binding.column_idx, self.bindings
);
}
}
Loading