Skip to content

Commit 3c2dd39

Browse files
committed
feat(sql): migrate to DataFusion-based streaming SQL planner
1 parent 6ab4638 commit 3c2dd39

38 files changed

+6199
-535
lines changed

Cargo.lock

Lines changed: 2308 additions & 114 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,36 @@ arrow-array = "52"
5252
arrow-ipc = "52"
5353
arrow-schema = "52"
5454
proctitle = "0.1"
55+
unicase = "2.7"
56+
petgraph = "0.7"
57+
itertools = "0.14"
58+
strum = { version = "0.26", features = ["derive"] }
59+
datafusion-functions-aggregate = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
60+
61+
typify = { git = 'https://github.com/ArroyoSystems/typify.git', branch = 'arroyo' }
62+
parquet = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '55.2.0/parquet'}
63+
arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '55.2.0/json'}
64+
datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
65+
datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
66+
datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
67+
datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
68+
datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
69+
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
70+
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
71+
datafusion-functions = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
72+
datafusion-functions-window = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
73+
74+
sqlparser = { git = "https://github.com/FunctionStream/sqlparser-rs", branch = "0.6.0/function-sql-parser" }
75+
76+
cornucopia_async = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
77+
cornucopia = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
78+
jiter = {git = "https://github.com/ArroyoSystems/jiter", branch = "disable_python" }
79+
5580

5681
[features]
5782
default = ["incremental-cache", "python"]
5883
incremental-cache = ["wasmtime/incremental-cache"]
5984
python = []
85+
86+
[patch."https://github.com/ArroyoSystems/sqlparser-rs"]
87+
sqlparser = { git = "https://github.com/FunctionStream/sqlparser-rs", branch = "0.6.0/function-sql-parser" }

src/coordinator/analyze/analyzer.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use super::Analysis;
1414
use crate::coordinator::execution_context::ExecutionContext;
1515
use crate::coordinator::statement::{
1616
CreateFunction, CreatePythonFunction, DropFunction, ShowFunctions, StartFunction, Statement,
17-
StatementVisitor, StatementVisitorContext, StatementVisitorResult, StopFunction,
17+
StatementVisitor, StatementVisitorContext, StatementVisitorResult, StopFunction, StreamingSql,
1818
};
1919
use std::fmt;
2020

@@ -115,4 +115,13 @@ impl StatementVisitor for Analyzer<'_> {
115115
) -> StatementVisitorResult {
116116
StatementVisitorResult::Analyze(Box::new(stmt.clone()))
117117
}
118+
119+
fn visit_streaming_sql(
120+
&self,
121+
stmt: &StreamingSql,
122+
_context: &StatementVisitorContext,
123+
) -> StatementVisitorResult {
124+
// TODO: add semantic analysis for streaming SQL (schema validation, etc.)
125+
StatementVisitorResult::Analyze(Box::new(StreamingSql::new(stmt.statement.clone())))
126+
}
118127
}

src/coordinator/coordinator.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::coordinator::execution::Executor;
2020
use crate::coordinator::plan::{LogicalPlanVisitor, LogicalPlanner, PlanNode};
2121
use crate::coordinator::statement::Statement;
2222
use crate::runtime::taskexecutor::TaskManager;
23+
use crate::sql::planner::StreamSchemaProvider;
2324

2425
use super::execution_context::ExecutionContext;
2526

@@ -90,7 +91,8 @@ impl Coordinator {
9091
}
9192

9293
fn step_build_logical_plan(&self, analysis: &Analysis) -> Result<Box<dyn PlanNode>> {
93-
let visitor = LogicalPlanVisitor::new();
94+
let schema_provider = StreamSchemaProvider::new();
95+
let visitor = LogicalPlanVisitor::new(schema_provider);
9496
let plan = visitor.visit(analysis);
9597
Ok(plan)
9698
}

src/coordinator/execution/executor.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::coordinator::dataset::{ExecuteResult, ShowFunctionsResult, empty_reco
1414
use crate::coordinator::plan::{
1515
CreateFunctionPlan, CreatePythonFunctionPlan, DropFunctionPlan, PlanNode, PlanVisitor,
1616
PlanVisitorContext, PlanVisitorResult, ShowFunctionsPlan, StartFunctionPlan, StopFunctionPlan,
17+
StreamingSqlPlan,
1718
};
1819
use crate::coordinator::statement::{ConfigSource, FunctionSource};
1920
use crate::runtime::taskexecutor::TaskManager;
@@ -200,4 +201,17 @@ impl PlanVisitor for Executor {
200201

201202
PlanVisitorResult::Execute(result)
202203
}
204+
205+
fn visit_streaming_sql_plan(
206+
&self,
207+
plan: &StreamingSqlPlan,
208+
_context: &PlanVisitorContext,
209+
) -> PlanVisitorResult {
210+
// TODO: apply rewrite_plan for streaming transformations, then execute
211+
let result = Err(ExecuteError::Internal(format!(
212+
"Streaming SQL execution not yet implemented. LogicalPlan:\n{}",
213+
plan.logical_plan.display_indent()
214+
)));
215+
PlanVisitorResult::Execute(result)
216+
}
203217
}

src/coordinator/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ pub use coordinator::Coordinator;
2323
pub use dataset::{DataSet, ShowFunctionsResult};
2424
pub use statement::{
2525
CreateFunction, CreatePythonFunction, DropFunction, PythonModule, ShowFunctions, StartFunction,
26-
Statement, StopFunction,
26+
Statement, StopFunction, StreamingSql,
2727
};

src/coordinator/plan/logical_plan_visitor.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,26 @@
1010
// See the License for the specific language governing permissions and
1111
// limitations under the License.
1212

13+
use tracing::debug;
14+
1315
use crate::coordinator::analyze::analysis::Analysis;
1416
use crate::coordinator::plan::{
1517
CreateFunctionPlan, CreatePythonFunctionPlan, DropFunctionPlan, PlanNode, ShowFunctionsPlan,
16-
StartFunctionPlan, StopFunctionPlan,
18+
StartFunctionPlan, StopFunctionPlan, StreamingSqlPlan,
1719
};
1820
use crate::coordinator::statement::{
1921
CreateFunction, CreatePythonFunction, DropFunction, ShowFunctions, StartFunction,
20-
StatementVisitor, StatementVisitorContext, StatementVisitorResult, StopFunction,
22+
StatementVisitor, StatementVisitorContext, StatementVisitorResult, StopFunction, StreamingSql,
2123
};
24+
use crate::sql::planner::StreamSchemaProvider;
2225

23-
#[derive(Debug, Default)]
24-
pub struct LogicalPlanVisitor;
26+
pub struct LogicalPlanVisitor {
27+
schema_provider: StreamSchemaProvider,
28+
}
2529

2630
impl LogicalPlanVisitor {
27-
pub fn new() -> Self {
28-
Self
31+
pub fn new(schema_provider: StreamSchemaProvider) -> Self {
32+
Self { schema_provider }
2933
}
3034

3135
pub fn visit(&self, analysis: &Analysis) -> Box<dyn PlanNode> {
@@ -51,7 +55,6 @@ impl StatementVisitor for LogicalPlanVisitor {
5155
let config_source = stmt.get_config_source().cloned();
5256
let extra_props = stmt.get_extra_properties().clone();
5357

54-
// Name will be read from config file during execution
5558
StatementVisitorResult::Plan(Box::new(CreateFunctionPlan::new(
5659
function_source,
5760
config_source,
@@ -106,4 +109,22 @@ impl StatementVisitor for LogicalPlanVisitor {
106109
config_content,
107110
)))
108111
}
112+
113+
fn visit_streaming_sql(
114+
&self,
115+
stmt: &StreamingSql,
116+
_context: &StatementVisitorContext,
117+
) -> StatementVisitorResult {
118+
let sql_to_rel = datafusion::sql::planner::SqlToRel::new(&self.schema_provider);
119+
120+
match sql_to_rel.sql_statement_to_plan(stmt.statement.clone()) {
121+
Ok(plan) => {
122+
debug!("Logical plan:\n{}", plan.display_graphviz());
123+
StatementVisitorResult::Plan(Box::new(StreamingSqlPlan::new(plan)))
124+
}
125+
Err(e) => {
126+
panic!("Failed to convert SQL statement to logical plan: {e}");
127+
}
128+
}
129+
}
109130
}

src/coordinator/plan/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ mod optimizer;
1818
mod show_functions_plan;
1919
mod start_function_plan;
2020
mod stop_function_plan;
21+
mod streaming_sql_plan;
2122
mod visitor;
2223

2324
pub use create_function_plan::CreateFunctionPlan;
@@ -28,6 +29,7 @@ pub use optimizer::LogicalPlanner;
2829
pub use show_functions_plan::ShowFunctionsPlan;
2930
pub use start_function_plan::StartFunctionPlan;
3031
pub use stop_function_plan::StopFunctionPlan;
32+
pub use streaming_sql_plan::StreamingSqlPlan;
3133
pub use visitor::{PlanVisitor, PlanVisitorContext, PlanVisitorResult};
3234

3335
use std::fmt;

src/sql/parser/mod.rs renamed to src/coordinator/plan/streaming_sql_plan.rs

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,33 +10,23 @@
1010
// See the License for the specific language governing permissions and
1111
// limitations under the License.
1212

13-
mod sql_parser;
13+
use datafusion::logical_expr::LogicalPlan;
1414

15-
pub use sql_parser::SqlParser;
15+
use super::{PlanNode, PlanVisitor, PlanVisitorContext, PlanVisitorResult};
1616

1717
#[derive(Debug)]
18-
pub struct ParseError {
19-
pub message: String,
18+
pub struct StreamingSqlPlan {
19+
pub logical_plan: LogicalPlan,
2020
}
2121

22-
impl std::fmt::Display for ParseError {
23-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24-
write!(f, "Parse error: {}", self.message)
22+
impl StreamingSqlPlan {
23+
pub fn new(logical_plan: LogicalPlan) -> Self {
24+
Self { logical_plan }
2525
}
2626
}
2727

28-
impl std::error::Error for ParseError {}
29-
30-
impl From<String> for ParseError {
31-
fn from(message: String) -> Self {
32-
ParseError { message }
33-
}
34-
}
35-
36-
impl ParseError {
37-
pub fn new(message: impl Into<String>) -> Self {
38-
Self {
39-
message: message.into(),
40-
}
28+
impl PlanNode for StreamingSqlPlan {
29+
fn accept(&self, visitor: &dyn PlanVisitor, context: &PlanVisitorContext) -> PlanVisitorResult {
30+
visitor.visit_streaming_sql_plan(self, context)
4131
}
4232
}

src/coordinator/plan/visitor.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
use super::{
1414
CreateFunctionPlan, CreatePythonFunctionPlan, DropFunctionPlan, ShowFunctionsPlan,
15-
StartFunctionPlan, StopFunctionPlan,
15+
StartFunctionPlan, StopFunctionPlan, StreamingSqlPlan,
1616
};
1717

1818
/// Context passed to PlanVisitor methods
@@ -84,4 +84,10 @@ pub trait PlanVisitor {
8484
plan: &CreatePythonFunctionPlan,
8585
context: &PlanVisitorContext,
8686
) -> PlanVisitorResult;
87+
88+
fn visit_streaming_sql_plan(
89+
&self,
90+
plan: &StreamingSqlPlan,
91+
context: &PlanVisitorContext,
92+
) -> PlanVisitorResult;
8793
}

0 commit comments

Comments
 (0)