diff --git a/Cargo.toml b/Cargo.toml index 813d9e94f5b31..db2cc46e9079b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ members = [ "datafusion/physical-expr", "datafusion/proto", "datafusion/row", + "datafusion/sql", "datafusion-examples", "benchmarks", ] diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 6c541059b5c16..f8170443e2135 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -51,7 +51,7 @@ regex_expressions = ["datafusion-physical-expr/regex_expressions"] # Used to enable scheduler scheduler = ["rayon"] simd = ["arrow/simd"] -unicode_expressions = ["datafusion-physical-expr/regex_expressions"] +unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion-sql/unicode_expressions"] [dependencies] ahash = { version = "0.7", default-features = false } @@ -65,6 +65,7 @@ datafusion-expr = { path = "../expr", version = "8.0.0" } datafusion-jit = { path = "../jit", version = "8.0.0", optional = true } datafusion-physical-expr = { path = "../physical-expr", version = "8.0.0" } datafusion-row = { path = "../row", version = "8.0.0" } +datafusion-sql = { path = "../sql", version = "8.0.0" } futures = "0.3" hashbrown = { version = "0.12", features = ["raw"] } lazy_static = { version = "^1.4.0" } diff --git a/datafusion/core/src/catalog/mod.rs b/datafusion/core/src/catalog/mod.rs index b9bf03e4f9086..0720f451ec6c7 100644 --- a/datafusion/core/src/catalog/mod.rs +++ b/datafusion/core/src/catalog/mod.rs @@ -23,109 +23,4 @@ pub mod catalog; pub mod information_schema; pub mod schema; -/// Represents a resolved path to a table of the form "catalog.schema.table" -#[derive(Clone, Copy)] -pub struct ResolvedTableReference<'a> { - /// The catalog (aka database) containing the table - pub catalog: &'a str, - /// The schema containing the table - pub schema: &'a str, - /// The table name - pub table: &'a str, -} - -/// Represents a path to a table that may require further resolution -#[derive(Clone, Copy)] -pub enum TableReference<'a> { - /// An unqualified table reference, e.g. "table" - Bare { - /// The table name - table: &'a str, - }, - /// A partially resolved table reference, e.g. "schema.table" - Partial { - /// The schema containing the table - schema: &'a str, - /// The table name - table: &'a str, - }, - /// A fully resolved table reference, e.g. "catalog.schema.table" - Full { - /// The catalog (aka database) containing the table - catalog: &'a str, - /// The schema containing the table - schema: &'a str, - /// The table name - table: &'a str, - }, -} - -impl<'a> TableReference<'a> { - /// Retrieve the actual table name, regardless of qualification - pub fn table(&self) -> &str { - match self { - Self::Full { table, .. } - | Self::Partial { table, .. } - | Self::Bare { table } => table, - } - } - - /// Given a default catalog and schema, ensure this table reference is fully resolved - pub fn resolve( - self, - default_catalog: &'a str, - default_schema: &'a str, - ) -> ResolvedTableReference<'a> { - match self { - Self::Full { - catalog, - schema, - table, - } => ResolvedTableReference { - catalog, - schema, - table, - }, - Self::Partial { schema, table } => ResolvedTableReference { - catalog: default_catalog, - schema, - table, - }, - Self::Bare { table } => ResolvedTableReference { - catalog: default_catalog, - schema: default_schema, - table, - }, - } - } -} - -impl<'a> From<&'a str> for TableReference<'a> { - fn from(s: &'a str) -> Self { - let parts: Vec<&str> = s.split('.').collect(); - - match parts.len() { - 1 => Self::Bare { table: s }, - 2 => Self::Partial { - schema: parts[0], - table: parts[1], - }, - 3 => Self::Full { - catalog: parts[0], - schema: parts[1], - table: parts[2], - }, - _ => Self::Bare { table: s }, - } - } -} - -impl<'a> From> for TableReference<'a> { - fn from(resolved: ResolvedTableReference<'a>) -> Self { - Self::Full { - catalog: resolved.catalog, - schema: resolved.schema, - table: resolved.table, - } - } -} +pub use datafusion_sql::{ResolvedTableReference, TableReference}; diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 629adf137fe31..619ac13b1365d 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -54,7 +54,6 @@ use arrow::datatypes::{DataType, SchemaRef}; use crate::catalog::{ catalog::{CatalogProvider, MemoryCatalogProvider}, schema::{MemorySchemaProvider, SchemaProvider}, - ResolvedTableReference, TableReference, }; use crate::dataframe::DataFrame; use crate::datasource::listing::ListingTableConfig; @@ -73,6 +72,7 @@ use crate::optimizer::projection_push_down::ProjectionPushDown; use crate::optimizer::simplify_expressions::SimplifyExpressions; use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy; use crate::optimizer::subquery_filter_to_join::SubqueryFilterToJoin; +use datafusion_sql::{ResolvedTableReference, TableReference}; use crate::physical_optimizer::coalesce_batches::CoalesceBatches; use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec; @@ -86,13 +86,14 @@ use crate::physical_plan::udaf::AggregateUDF; use crate::physical_plan::udf::ScalarUDF; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::PhysicalPlanner; -use crate::sql::{ - parser::DFParser, - planner::{ContextProvider, SqlToRel}, -}; use crate::variable::{VarProvider, VarType}; use async_trait::async_trait; use chrono::{DateTime, Utc}; +use datafusion_expr::TableSource; +use datafusion_sql::{ + parser::DFParser, + planner::{ContextProvider, SqlToRel}, +}; use parquet::file::properties::WriterProperties; use uuid::Uuid; @@ -1423,15 +1424,18 @@ impl SessionState { } impl ContextProvider for SessionState { - fn get_table_provider(&self, name: TableReference) -> Result> { + fn get_table_provider(&self, name: TableReference) -> Result> { let resolved_ref = self.resolve_table_ref(name); match self.schema_for_ref(resolved_ref) { - Ok(schema) => schema.table(resolved_ref.table).ok_or_else(|| { - DataFusionError::Plan(format!( - "'{}.{}.{}' not found", - resolved_ref.catalog, resolved_ref.schema, resolved_ref.table - )) - }), + Ok(schema) => { + let provider = schema.table(resolved_ref.table).ok_or_else(|| { + DataFusionError::Plan(format!( + "'{}.{}.{}' not found", + resolved_ref.catalog, resolved_ref.schema, resolved_ref.table + )) + })?; + Ok(provider_as_source(provider)) + } Err(e) => Err(e), } } diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index b553c0ed84b53..600e24fb8f187 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -220,7 +220,6 @@ pub mod prelude; pub mod scalar; #[cfg(feature = "scheduler")] pub mod scheduler; -pub mod sql; pub mod variable; // re-export dependencies from arrow-rs to minimise version maintenance for crate users @@ -232,6 +231,7 @@ pub use datafusion_common as common; pub use datafusion_data_access; pub use datafusion_expr as logical_expr; pub use datafusion_physical_expr as physical_expr; +pub use datafusion_sql as sql; pub use datafusion_row as row; diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 27dd46b072d4b..50e86e1712f27 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -53,7 +53,6 @@ use crate::physical_plan::windows::WindowAggExec; use crate::physical_plan::{join_utils, Partitioning}; use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr}; use crate::scalar::ScalarValue; -use crate::sql::utils::window_expr_common_partition_keys; use crate::variable::VarType; use crate::{ error::{DataFusionError, Result}, @@ -65,6 +64,7 @@ use arrow::{compute::can_cast_types, datatypes::DataType}; use async_trait::async_trait; use datafusion_expr::expr::GroupingSet; use datafusion_physical_expr::expressions::DateIntervalExpr; +use datafusion_sql::utils::window_expr_common_partition_keys; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryStreamExt}; use log::{debug, trace}; diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml new file mode 100644 index 0000000000000..673823c37ad22 --- /dev/null +++ b/datafusion/sql/Cargo.toml @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-sql" +description = "DataFusion SQL Query Planner" +version = "8.0.0" +homepage = "https://github.com/apache/arrow-datafusion" +repository = "https://github.com/apache/arrow-datafusion" +readme = "README.md" +authors = ["Apache Arrow "] +license = "Apache-2.0" +keywords = [ "datafusion", "sql", "parser", "planner" ] +edition = "2021" +rust-version = "1.59" + +[lib] +name = "datafusion_sql" +path = "src/lib.rs" + +[features] +default = ["unicode_expressions"] +unicode_expressions = [] + +[dependencies] +ahash = { version = "0.7", default-features = false } +arrow = { version = "14.0.0", features = ["prettyprint"] } +datafusion-common = { path = "../common", version = "8.0.0" } +datafusion-expr = { path = "../expr", version = "8.0.0" } +hashbrown = "0.12" +sqlparser = "0.17" +tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } diff --git a/datafusion/sql/README.md b/datafusion/sql/README.md new file mode 100644 index 0000000000000..18abeb23e20f1 --- /dev/null +++ b/datafusion/sql/README.md @@ -0,0 +1,26 @@ + + +# DataFusion SQL Query Planner + +[DataFusion](df) is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format. + +This crate is a submodule of DataFusion that provides a SQL query planner. + +[df]: https://crates.io/crates/datafusion diff --git a/datafusion/core/src/sql/mod.rs b/datafusion/sql/src/lib.rs similarity index 90% rename from datafusion/core/src/sql/mod.rs rename to datafusion/sql/src/lib.rs index cc8b004505fbc..75da587f9e5d1 100644 --- a/datafusion/core/src/sql/mod.rs +++ b/datafusion/sql/src/lib.rs @@ -20,4 +20,7 @@ pub mod parser; pub mod planner; -pub(crate) mod utils; +mod table_reference; +pub mod utils; + +pub use table_reference::{ResolvedTableReference, TableReference}; diff --git a/datafusion/core/src/sql/parser.rs b/datafusion/sql/src/parser.rs similarity index 99% rename from datafusion/core/src/sql/parser.rs rename to datafusion/sql/src/parser.rs index b0c44127edfd8..da4638764d7ba 100644 --- a/datafusion/core/src/sql/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -19,7 +19,7 @@ //! //! Declares a SQL parser based on sqlparser that handles custom formats that we need. -use crate::logical_plan::FileType; +use datafusion_expr::logical_plan::FileType; use sqlparser::{ ast::{ColumnDef, ColumnOptionDef, Statement as SQLStatement, TableConstraint}, dialect::{keywords::Keyword, Dialect, GenericDialect}, diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/sql/src/planner.rs similarity index 97% rename from datafusion/core/src/sql/planner.rs rename to datafusion/sql/src/planner.rs index 8864caacbc002..ba05a224523dc 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -17,45 +17,44 @@ //! SQL Query Planner (produces logical plan from SQL AST) +use crate::parser::{CreateExternalTable, Statement as DFStatement}; +use arrow::datatypes::*; +use datafusion_common::ToDFSchema; +use datafusion_expr::expr_rewriter::normalize_col; +use datafusion_expr::expr_rewriter::normalize_col_with_schemas; +use datafusion_expr::logical_plan::{ + Analyze, CreateCatalog, CreateCatalogSchema, + CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView, + DropTable, Explain, FileType, JoinType, LogicalPlan, LogicalPlanBuilder, PlanType, + ToStringifiedPlan, +}; +use datafusion_expr::utils::{ + expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, exprlist_to_columns, + find_aggregate_exprs, find_column_exprs, find_window_exprs, +}; +use datafusion_expr::{ + and, col, lit, AggregateFunction, AggregateUDF, Expr, Operator, ScalarUDF, + WindowFrame, WindowFrameUnits, +}; +use datafusion_expr::{ + window_function::WindowFunction, BuiltinScalarFunction, TableSource, +}; +use hashbrown::HashMap; use std::collections::HashSet; use std::iter; use std::str::FromStr; use std::sync::Arc; use std::{convert::TryInto, vec}; -use crate::catalog::TableReference; -use crate::datasource::TableProvider; -use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits}; -use crate::logical_plan::Expr::Alias; -use crate::logical_plan::{ - and, col, lit, normalize_col, normalize_col_with_schemas, provider_as_source, Column, - CreateCatalog, CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable, - CreateMemoryTable, CreateView, DFSchema, DFSchemaRef, DropTable, Expr, FileType, - LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema, ToStringifiedPlan, +use crate::table_reference::TableReference; +use crate::utils::{make_decimal_type, normalize_ident, resolve_columns}; +use datafusion_common::{ + field_not_found, Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, }; -use crate::prelude::JoinType; -use crate::scalar::ScalarValue; -use crate::sql::utils::{make_decimal_type, normalize_ident, resolve_columns}; -use crate::{ - error::{DataFusionError, Result}, - logical_expr::utils::{expand_qualified_wildcard, expand_wildcard}, - physical_plan::aggregates, - physical_plan::udaf::AggregateUDF, - physical_plan::udf::ScalarUDF, - sql::parser::{CreateExternalTable, Statement as DFStatement}, -}; -use arrow::datatypes::*; -use datafusion_expr::utils::{ - expr_as_column_expr, exprlist_to_columns, find_aggregate_exprs, find_column_exprs, - find_window_exprs, -}; -use datafusion_expr::{window_function::WindowFunction, BuiltinScalarFunction}; -use hashbrown::HashMap; - -use datafusion_common::field_not_found; use datafusion_expr::expr::GroupingSet; use datafusion_expr::logical_plan::builder::project_with_alias; use datafusion_expr::logical_plan::{Filter, Subquery}; +use datafusion_expr::Expr::Alias; use sqlparser::ast::{ BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg, FunctionArgExpr, Ident, Join, JoinConstraint, JoinOperator, ObjectName, @@ -74,13 +73,12 @@ use super::{ resolve_aliases_to_exprs, resolve_positions_to_exprs, }, }; -use crate::logical_plan::plan::{Analyze, Explain}; /// The ContextProvider trait allows the query planner to obtain meta-data about tables and /// functions referenced in SQL statements pub trait ContextProvider { /// Getter for a datasource - fn get_table_provider(&self, name: TableReference) -> Result>; + fn get_table_provider(&self, name: TableReference) -> Result>; /// Getter for a UDF description fn get_function_meta(&self, name: &str) -> Option>; /// Getter for a UDAF description @@ -714,11 +712,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { _ => Ok(cte_plan.clone()), }, (_, Ok(provider)) => { - let scan = LogicalPlanBuilder::scan( - &table_name, - provider_as_source(provider), - None, - ); + let scan = + LogicalPlanBuilder::scan(&table_name, provider, None); let scan = match table_alias.as_ref() { Some(ref name) => scan?.alias(name.to_owned().as_str()), _ => scan, @@ -2040,7 +2035,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } // next, aggregate built-ins - if let Ok(fun) = aggregates::AggregateFunction::from_str(&name) { + if let Ok(fun) = AggregateFunction::from_str(&name) { let distinct = function.distinct; let (fun, args) = self.aggregate_fn_to_expr(fun, function, schema)?; return Ok(Expr::AggregateFunction { @@ -2152,12 +2147,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { fn aggregate_fn_to_expr( &self, - fun: aggregates::AggregateFunction, + fun: AggregateFunction, function: sqlparser::ast::Function, schema: &DFSchema, - ) -> Result<(aggregates::AggregateFunction, Vec)> { + ) -> Result<(AggregateFunction, Vec)> { let args = match fun { - aggregates::AggregateFunction::Count => function + AggregateFunction::Count => function .args .into_iter() .map(|a| match a { @@ -2168,7 +2163,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { _ => self.sql_fn_arg_to_logical_expr(a, schema, &mut HashMap::new()), }) .collect::>>()?, - aggregates::AggregateFunction::ApproxMedian => function + AggregateFunction::ApproxMedian => function .args .into_iter() .map(|a| self.sql_fn_arg_to_logical_expr(a, schema, &mut HashMap::new())) @@ -2178,9 +2173,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; let fun = match fun { - aggregates::AggregateFunction::ApproxMedian => { - aggregates::AggregateFunction::ApproxPercentileCont - } + AggregateFunction::ApproxMedian => AggregateFunction::ApproxPercentileCont, _ => fun, }; @@ -2649,14 +2642,9 @@ fn parse_sql_number(n: &str) -> Result { #[cfg(test)] mod tests { - use crate::datasource::empty::EmptyTable; - use crate::execution::context::ExecutionProps; - use crate::optimizer::limit_push_down::LimitPushDown; - use crate::optimizer::optimizer::OptimizerRule; - use crate::{assert_contains, logical_plan::create_udf, sql::parser::DFParser}; - use datafusion_expr::{ScalarFunctionImplementation, Volatility}; - use super::*; + use crate::assert_contains; + use std::any::Any; #[test] fn select_no_relation() { @@ -4393,23 +4381,13 @@ mod tests { assert_eq!(format!("{:?}", plan), expected); } - fn quick_test_with_limit_pushdown(sql: &str, expected: &str) { - let plan = logical_plan(sql).unwrap(); - let rule = LimitPushDown::new(); - let optimized_plan = rule - .optimize(&plan, &ExecutionProps::new()) - .expect("failed to optimize plan"); - let formatted_plan = format!("{:?}", optimized_plan); - assert_eq!(formatted_plan, expected); - } - struct MockContextProvider {} impl ContextProvider for MockContextProvider { fn get_table_provider( &self, name: TableReference, - ) -> Result> { + ) -> Result> { let schema = match name.table() { "test" => Ok(Schema::new(vec![ Field::new("t_date32", DataType::Date32, false), @@ -4485,19 +4463,8 @@ mod tests { } } - fn get_function_meta(&self, name: &str) -> Option> { - let f: ScalarFunctionImplementation = - Arc::new(|_| Err(DataFusionError::NotImplemented("".to_string()))); - match name { - "my_sqrt" => Some(Arc::new(create_udf( - "my_sqrt", - vec![DataType::Float64], - Arc::new(DataType::Float64), - Volatility::Immutable, - f, - ))), - _ => None, - } + fn get_function_meta(&self, _name: &str) -> Option> { + unimplemented!() } fn get_aggregate_meta(&self, _name: &str) -> Option> { @@ -4869,33 +4836,32 @@ mod tests { fn test_offset_no_limit() { let sql = "SELECT id FROM person WHERE person.id > 100 OFFSET 5;"; let expected = "Offset: 5\ - \n Projection: #person.id\ - \n Filter: #person.id > Int64(100)\ - \n TableScan: person projection=None"; + \n Projection: #person.id\ + \n Filter: #person.id > Int64(100)\ + \n TableScan: person projection=None"; quick_test(sql, expected); } #[test] - fn test_offset_after_limit_with_limit_push() { + fn test_offset_after_limit() { let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 3;"; let expected = "Offset: 3\ - \n Limit: 8\ - \n Projection: #person.id\ - \n Filter: #person.id > Int64(100)\ - \n TableScan: person projection=None"; - - quick_test_with_limit_pushdown(sql, expected); + \n Limit: 5\ + \n Projection: #person.id\ + \n Filter: #person.id > Int64(100)\ + \n TableScan: person projection=None"; + quick_test(sql, expected); } #[test] - fn test_offset_before_limit_with_limit_push() { + fn test_offset_before_limit() { let sql = "select id from person where person.id > 100 OFFSET 3 LIMIT 5;"; let expected = "Offset: 3\ - \n Limit: 8\ - \n Projection: #person.id\ - \n Filter: #person.id > Int64(100)\ - \n TableScan: person projection=None"; - quick_test_with_limit_pushdown(sql, expected); + \n Limit: 5\ + \n Projection: #person.id\ + \n Filter: #person.id > Int64(100)\ + \n TableScan: person projection=None"; + quick_test(sql, expected); } fn assert_field_not_found(err: DataFusionError, name: &str) { @@ -4910,4 +4876,47 @@ mod tests { _ => panic!("assert_field_not_found wrong error type"), } } + + /// A macro to assert that one string is contained within another with + /// a nice error message if they are not. + /// + /// Usage: `assert_contains!(actual, expected)` + /// + /// Is a macro so test error + /// messages are on the same line as the failure; + /// + /// Both arguments must be convertable into Strings (Into) + #[macro_export] + macro_rules! assert_contains { + ($ACTUAL: expr, $EXPECTED: expr) => { + let actual_value: String = $ACTUAL.into(); + let expected_value: String = $EXPECTED.into(); + assert!( + actual_value.contains(&expected_value), + "Can not find expected in actual.\n\nExpected:\n{}\n\nActual:\n{}", + expected_value, + actual_value + ); + }; + } + + struct EmptyTable { + table_schema: SchemaRef, + } + + impl EmptyTable { + fn new(table_schema: SchemaRef) -> Self { + Self { table_schema } + } + } + + impl TableSource for EmptyTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.table_schema.clone() + } + } } diff --git a/datafusion/sql/src/table_reference.rs b/datafusion/sql/src/table_reference.rs new file mode 100644 index 0000000000000..44848eb991942 --- /dev/null +++ b/datafusion/sql/src/table_reference.rs @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// / Represents a resolved path to a table of the form "catalog.schema.table" +#[derive(Clone, Copy)] +pub struct ResolvedTableReference<'a> { + /// The catalog (aka database) containing the table + pub catalog: &'a str, + /// The schema containing the table + pub schema: &'a str, + /// The table name + pub table: &'a str, +} + +/// Represents a path to a table that may require further resolution +#[derive(Clone, Copy)] +pub enum TableReference<'a> { + /// An unqualified table reference, e.g. "table" + Bare { + /// The table name + table: &'a str, + }, + /// A partially resolved table reference, e.g. "schema.table" + Partial { + /// The schema containing the table + schema: &'a str, + /// The table name + table: &'a str, + }, + /// A fully resolved table reference, e.g. "catalog.schema.table" + Full { + /// The catalog (aka database) containing the table + catalog: &'a str, + /// The schema containing the table + schema: &'a str, + /// The table name + table: &'a str, + }, +} + +impl<'a> TableReference<'a> { + /// Retrieve the actual table name, regardless of qualification + pub fn table(&self) -> &str { + match self { + Self::Full { table, .. } + | Self::Partial { table, .. } + | Self::Bare { table } => table, + } + } + + /// Given a default catalog and schema, ensure this table reference is fully resolved + pub fn resolve( + self, + default_catalog: &'a str, + default_schema: &'a str, + ) -> ResolvedTableReference<'a> { + match self { + Self::Full { + catalog, + schema, + table, + } => ResolvedTableReference { + catalog, + schema, + table, + }, + Self::Partial { schema, table } => ResolvedTableReference { + catalog: default_catalog, + schema, + table, + }, + Self::Bare { table } => ResolvedTableReference { + catalog: default_catalog, + schema: default_schema, + table, + }, + } + } +} + +impl<'a> From<&'a str> for TableReference<'a> { + fn from(s: &'a str) -> Self { + let parts: Vec<&str> = s.split('.').collect(); + + match parts.len() { + 1 => Self::Bare { table: s }, + 2 => Self::Partial { + schema: parts[0], + table: parts[1], + }, + 3 => Self::Full { + catalog: parts[0], + schema: parts[1], + table: parts[2], + }, + _ => Self::Bare { table: s }, + } + } +} + +impl<'a> From> for TableReference<'a> { + fn from(resolved: ResolvedTableReference<'a>) -> Self { + Self::Full { + catalog: resolved.catalog, + schema: resolved.schema, + table: resolved.table, + } + } +} diff --git a/datafusion/core/src/sql/utils.rs b/datafusion/sql/src/utils.rs similarity index 98% rename from datafusion/core/src/sql/utils.rs rename to datafusion/sql/src/utils.rs index 31034975e5515..92e98813501b4 100644 --- a/datafusion/core/src/sql/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -20,11 +20,10 @@ use arrow::datatypes::{DataType, DECIMAL_MAX_PRECISION}; use sqlparser::ast::Ident; -use crate::error::{DataFusionError, Result}; -use crate::logical_plan::{Expr, LogicalPlan}; -use crate::scalar::ScalarValue; +use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::expr::GroupingSet; use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs}; +use datafusion_expr::{Expr, LogicalPlan}; use std::collections::HashMap; /// Make a best-effort attempt at resolving all columns in the expression tree @@ -422,9 +421,7 @@ pub(crate) fn resolve_aliases_to_exprs( /// given a slice of window expressions sharing the same sort key, find their common partition /// keys. -pub(crate) fn window_expr_common_partition_keys( - window_exprs: &[Expr], -) -> Result<&[Expr]> { +pub fn window_expr_common_partition_keys(window_exprs: &[Expr]) -> Result<&[Expr]> { let all_partition_keys = window_exprs .iter() .map(|expr| match expr { diff --git a/dev/release/README.md b/dev/release/README.md index ac07b4f27603c..c7ef713daa168 100644 --- a/dev/release/README.md +++ b/dev/release/README.md @@ -329,6 +329,7 @@ dot -Tsvg dev/release/crate-deps.dot > dev/release/crate-deps.svg (cd datafusion/data-access && cargo publish) (cd datafusion/common && cargo publish) (cd datafusion/expr && cargo publish) +(cd datafusion/sql && cargo publish) (cd datafusion/physical-expr && cargo publish) (cd datafusion/jit && cargo publish) (cd datafusion/row && cargo publish) @@ -337,15 +338,6 @@ dot -Tsvg dev/release/crate-deps.dot > dev/release/crate-deps.svg (cd datafusion-cli && cargo publish) ``` -If there is a ballista release, run - -```shell -(cd ballista/rust/core && cargo publish) -(cd ballista/rust/executor && cargo publish) -(cd ballista/rust/scheduler && cargo publish) -(cd ballista/rust/client && cargo publish) -(cd ballista-cli && cargo publish) -``` ### Publish datafusion-cli on Homebrew and crates.io @@ -379,15 +371,12 @@ We have published new versions of datafusion and ballista to crates.io: https://crates.io/crates/datafusion/8.0.0 https://crates.io/crates/datafusion-cli/8.0.0 -https://crates.io/crates/datafusion-expr/8.0.0 https://crates.io/crates/datafusion-common/8.0.0 +https://crates.io/crates/datafusion-data-access/8.0.0 +https://crates.io/crates/datafusion-expr/8.0.0 https://crates.io/crates/datafusion-jit/8.0.0 https://crates.io/crates/datafusion-physical-expr/8.0.0 https://crates.io/crates/datafusion-proto/8.0.0 -https://crates.io/crates/datafusion-data-access/8.0.0 -https://crates.io/crates/ballista/0.7.0 -https://crates.io/crates/ballista-cli/0.7.0 -https://crates.io/crates/ballista-core/0.7.0 -https://crates.io/crates/ballista-executor/0.7.0 -https://crates.io/crates/ballista-scheduler/0.7.0 +https://crates.io/crates/datafusion-row/8.0.0 +https://crates.io/crates/datafusion-sql/8.0.0 ``` diff --git a/dev/release/crate-deps.dot b/dev/release/crate-deps.dot index 66dd71fb343b2..cd8e891c4da4a 100644 --- a/dev/release/crate-deps.dot +++ b/dev/release/crate-deps.dot @@ -23,6 +23,9 @@ digraph G { datafusion_expr -> datafusion_common + datafusion_sql -> datafusion_common + datafusion_sql -> datafusion_expr + datafusion_physical_expr -> datafusion_common datafusion_physical_expr -> datafusion_expr @@ -38,6 +41,7 @@ digraph G { datafusion -> datafusion_jit datafusion -> datafusion_physical_expr datafusion -> datafusion_row + datafusion -> datafusion_sql datafusion_proto -> datafusion diff --git a/dev/release/crate-deps.svg b/dev/release/crate-deps.svg index a4998cf8063e2..0dbe37ca91637 100644 --- a/dev/release/crate-deps.svg +++ b/dev/release/crate-deps.svg @@ -4,154 +4,178 @@ - + G - + datafusion_common - -datafusion_common + +datafusion_common datafusion_data_access - -datafusion_data_access + +datafusion_data_access datafusion_expr - -datafusion_expr + +datafusion_expr datafusion_expr->datafusion_common - - + + - + +datafusion_sql + +datafusion_sql + + + +datafusion_sql->datafusion_common + + + + + +datafusion_sql->datafusion_expr + + + + + datafusion_physical_expr - -datafusion_physical_expr + +datafusion_physical_expr - + datafusion_physical_expr->datafusion_common - - + + - + datafusion_physical_expr->datafusion_expr - - + + - + datafusion_jit - -datafusion_jit + +datafusion_jit - + datafusion_jit->datafusion_common - - + + - + datafusion_jit->datafusion_expr - - + + - + datafusion_row - -datafusion_row + +datafusion_row - + datafusion_row->datafusion_common - - + + - + datafusion_row->datafusion_jit - - + + - + datafusion - -datafusion + +datafusion - + datafusion->datafusion_common - - + + - + datafusion->datafusion_data_access - - + + - + datafusion->datafusion_expr - - + + + + + +datafusion->datafusion_sql + + - + datafusion->datafusion_physical_expr - - + + - + datafusion->datafusion_jit - - + + - + datafusion->datafusion_row - - + + - + datafusion_proto - -datafusion_proto + +datafusion_proto - + datafusion_proto->datafusion - - + + - + datafusion_cli - -datafusion_cli + +datafusion_cli - + datafusion_cli->datafusion - - + + diff --git a/dev/update_datafusion_versions.py b/dev/update_datafusion_versions.py index 2560f79dcfb26..b86ebc8e977d1 100755 --- a/dev/update_datafusion_versions.py +++ b/dev/update_datafusion_versions.py @@ -37,7 +37,8 @@ 'datafusion-jit': 'datafusion/jit/Cargo.toml', 'datafusion-physical-expr': 'datafusion/physical-expr/Cargo.toml', 'datafusion-proto': 'datafusion/proto/Cargo.toml', - 'datafusion-row': 'datafusion/row/Cargo.toml' + 'datafusion-row': 'datafusion/row/Cargo.toml', + 'datafusion-sql': 'datafusion/sql/Cargo.toml', } def update_datafusion_version(cargo_toml: str, new_version: str):