Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ ordered-float = "3.0"
derive-new = "0.5.9"
log = "0.4"
env_logger = "0.10"
derive_builder = "0.12.0"
async-stream = "0.3"

[dev-dependencies]
test-case = "2"
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ clean:
run:
cargo run --release

run_v2:
ENABLE_V2=1 cargo run --release

debug:
RUST_BACKTRACE=1 cargo run

Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ describe t1;
-- previous SQL statements
select v1+1 as a from t1 where a >= 2;
select v1 from t1 limit 2 offset 1;
-- table functions
select * from sqlrs_tables();
select * from sqlrs_columns();
select * from read_csv('t1.csv');
select * from read_csv('t1.csv', header=>true, delim=>',');
select * from 't1.csv';
```


Expand Down
4 changes: 2 additions & 2 deletions src/execution/physical_plan_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ impl PhysicalPlanGenerator {
) -> PhysicalOperatorBase {
let children = base
.children
.iter()
.map(|op| self.create_plan_internal(op.clone()))
.into_iter()
.map(|op| self.create_plan_internal(op))
.collect::<Vec<_>>();
PhysicalOperatorBase::new(children, base.expressioins)
}
Expand Down
13 changes: 7 additions & 6 deletions src/execution/volcano_executor/table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ impl TableScan {

let function = self.plan.function;
let table_scan_func = function.function;
let mut tabel_scan_input = TableFunctionInput::new(bind_data);
let tabel_scan_input = TableFunctionInput::new(bind_data);

while let Some(batch) =
table_scan_func(context.clone_client_context(), &mut tabel_scan_input)?
{
let scan_stream = table_scan_func(context.clone_client_context(), tabel_scan_input)?;

#[for_await]
for batch in scan_stream {
let batch = batch?;
let columns = batch.columns().to_vec();
let try_new = RecordBatch::try_new(schema.clone(), columns)?;
yield try_new
yield RecordBatch::try_new(schema.clone(), columns)?
}
}
}
14 changes: 14 additions & 0 deletions src/function/errors.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use std::io;

use arrow::error::ArrowError;

use crate::catalog_v2::CatalogError;
use crate::planner_v2::BindError;
use crate::types_v2::TypeError;

pub type FunctionResult<T> = Result<T, FunctionError>;

// TODO: refactor error using https://docs.rs/snafu/latest/snafu/
#[derive(thiserror::Error, Debug)]
pub enum FunctionError {
#[error("catalog error: {0}")]
Expand Down Expand Up @@ -31,4 +37,12 @@ pub enum FunctionError {
ComparisonError(String),
#[error("Conjunction error: {0}")]
ConjunctionError(String),
#[error("io error")]
IoError(#[from] io::Error),
}

impl From<BindError> for FunctionError {
fn from(e: BindError) -> Self {
FunctionError::InternalError(e.to_string())
}
}
2 changes: 2 additions & 0 deletions src/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub enum FunctionData {
SeqTableScanInputData(Box<SeqTableScanInputData>),
SqlrsTablesData(Box<SqlrsTablesData>),
SqlrsColumnsData(Box<SqlrsColumnsData>),
ReadCSVInputData(Box<ReadCSVInputData>),
}

#[derive(new)]
Expand Down Expand Up @@ -61,6 +62,7 @@ impl BuiltinFunctions {
SubtractFunction::register_function(self)?;
MultiplyFunction::register_function(self)?;
DivideFunction::register_function(self)?;
ReadCSV::register_function(self)?;
Ok(())
}
}
2 changes: 2 additions & 0 deletions src/function/table/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod read_csv;
mod seq_table_scan;
mod sqlrs_columns;
mod sqlrs_tables;
mod table_function;
pub use read_csv::*;
pub use seq_table_scan::*;
pub use sqlrs_columns::*;
pub use sqlrs_tables::*;
Expand Down
Loading