diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index c0a0134fedfb1..c97f770ab3013 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -28,7 +28,11 @@ use crate::{ optimizer::optimizer::Optimizer, physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule}, }; -use datafusion_common::{alias::AliasGenerator, not_impl_err, plan_err}; +use datafusion_common::{ + alias::AliasGenerator, + not_impl_err, plan_err, + tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}, +}; use datafusion_execution::registry::SerializerRegistry; use datafusion_expr::{ logical_plan::{DdlStatement, Statement}, @@ -163,12 +167,14 @@ where /// * Register a custom data source that can be referenced from a SQL query. /// * Execution a SQL query /// +/// # Example: DataFrame API +/// /// The following example demonstrates how to use the context to execute a query against a CSV /// data source using the DataFrame API: /// /// ``` /// use datafusion::prelude::*; -/// # use datafusion::error::Result; +/// # use datafusion::{error::Result, assert_batches_eq}; /// # #[tokio::main] /// # async fn main() -> Result<()> { /// let ctx = SessionContext::new(); @@ -176,22 +182,49 @@ where /// let df = df.filter(col("a").lt_eq(col("b")))? /// .aggregate(vec![col("a")], vec![min(col("b"))])? /// .limit(0, Some(100))?; -/// let results = df.collect(); +/// let results = df +/// .collect() +/// .await?; +/// assert_batches_eq!( +/// &[ +/// "+---+----------------+", +/// "| a | MIN(?table?.b) |", +/// "+---+----------------+", +/// "| 1 | 2 |", +/// "+---+----------------+", +/// ], +/// &results +/// ); /// # Ok(()) /// # } /// ``` /// +/// # Example: SQL API +/// /// The following example demonstrates how to execute the same query using SQL: /// /// ``` /// use datafusion::prelude::*; -/// -/// # use datafusion::error::Result; +/// # use datafusion::{error::Result, assert_batches_eq}; /// # #[tokio::main] /// # async fn main() -> Result<()> { /// let mut ctx = SessionContext::new(); /// ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?; -/// let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").await?; +/// let results = ctx +/// .sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100") +/// .await? +/// .collect() +/// .await?; +/// assert_batches_eq!( +/// &[ +/// "+---+----------------+", +/// "| a | MIN(example.b) |", +/// "+---+----------------+", +/// "| 1 | 2 |", +/// "+---+----------------+", +/// ], +/// &results +/// ); /// # Ok(()) /// # } /// ``` @@ -342,22 +375,82 @@ impl SessionContext { self.state.read().config.clone() } - /// Creates a [`DataFrame`] that will execute a SQL query. + /// Creates a [`DataFrame`] from SQL query text. /// /// Note: This API implements DDL statements such as `CREATE TABLE` and /// `CREATE VIEW` and DML statements such as `INSERT INTO` with in-memory - /// default implementations. + /// default implementations. See [`Self::sql_with_options`]. + /// + /// # Example: Running SQL queries + /// + /// See the example on [`Self`] /// - /// If this is not desirable, consider using [`SessionState::create_logical_plan()`] which - /// does not mutate the state based on such statements. + /// # Example: Creating a Table with SQL + /// + /// ``` + /// use datafusion::prelude::*; + /// # use datafusion::{error::Result, assert_batches_eq}; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let mut ctx = SessionContext::new(); + /// ctx + /// .sql("CREATE TABLE foo (x INTEGER)") + /// .await? + /// .collect() + /// .await?; + /// assert!(ctx.table_exist("foo").unwrap()); + /// # Ok(()) + /// # } + /// ``` pub async fn sql(&self, sql: &str) -> Result { - // create a query planner + self.sql_with_options(sql, SQLOptions::new()).await + } + + /// Creates a [`DataFrame`] from SQL query text, first validating + /// that the queries are allowed by `options` + /// + /// # Example: Preventing Creating a Table with SQL + /// + /// If you want to avoid creating tables, or modifying data or the + /// session, set [`SQLOptions`] appropriately: + /// + /// ``` + /// use datafusion::prelude::*; + /// # use datafusion::{error::Result}; + /// # use datafusion::physical_plan::collect; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let mut ctx = SessionContext::new(); + /// let options = SQLOptions::new() + /// .with_allow_ddl(false); + /// let err = ctx.sql_with_options("CREATE TABLE foo (x INTEGER)", options) + /// .await + /// .unwrap_err(); + /// assert_eq!( + /// err.to_string(), + /// "Error during planning: DDL not supported: CreateMemoryTable" + /// ); + /// # Ok(()) + /// # } + /// ``` + pub async fn sql_with_options( + &self, + sql: &str, + options: SQLOptions, + ) -> Result { let plan = self.state().create_logical_plan(sql).await?; + options.verify_plan(&plan)?; self.execute_logical_plan(plan).await } - /// Execute the [`LogicalPlan`], return a [`DataFrame`] + /// Execute the [`LogicalPlan`], return a [`DataFrame`]. This API + /// is not featured limited (so all SQL such as `CREATE TABLE` and + /// `COPY` will be run). + /// + /// If you wish to limit the type of plan that can be run from + /// SQL, see [`Self::sql_with_options`] and + /// [`SQLOptions::verify_plan`]. pub async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result { match plan { LogicalPlan::Ddl(ddl) => match ddl { @@ -1304,7 +1397,7 @@ impl FunctionRegistry for SessionContext { /// A planner used to add extensions to DataFusion logical and physical plans. #[async_trait] pub trait QueryPlanner { - /// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution + /// Given a `LogicalPlan`, create an [`ExecutionPlan`] suitable for execution async fn create_physical_plan( &self, logical_plan: &LogicalPlan, @@ -1317,7 +1410,7 @@ struct DefaultQueryPlanner {} #[async_trait] impl QueryPlanner for DefaultQueryPlanner { - /// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution + /// Given a `LogicalPlan`, create an [`ExecutionPlan`] suitable for execution async fn create_physical_plan( &self, logical_plan: &LogicalPlan, @@ -1628,7 +1721,8 @@ impl SessionState { &mut self.table_factories } - /// Convert a SQL string into an AST Statement + /// Parse an SQL string into an DataFusion specific AST + /// [`Statement`]. See [`SessionContext::sql`] for running queries. pub fn sql_to_statement( &self, sql: &str, @@ -1787,9 +1881,15 @@ impl SessionState { query.statement_to_plan(statement) } - /// Creates a [`LogicalPlan`] from the provided SQL string + /// Creates a [`LogicalPlan`] from the provided SQL string. This + /// interface will plan any SQL DataFusion supports, including DML + /// like `CREATE TABLE`, and `COPY` (which can write to local + /// files. /// - /// See [`SessionContext::sql`] for a higher-level interface that also handles DDL + /// See [`SessionContext::sql`] and + /// [`SessionContext::sql_with_options`] for a higher-level + /// interface that handles DDL and verification of allowed + /// statements. pub async fn create_logical_plan(&self, sql: &str) -> Result { let dialect = self.config.options().sql_parser.dialect.as_str(); let statement = self.sql_to_statement(sql, dialect)?; @@ -1870,7 +1970,11 @@ impl SessionState { /// Creates a physical plan from a logical plan. /// - /// Note: this first calls [`Self::optimize`] on the provided plan + /// Note: this first calls [`Self::optimize`] on the provided + /// plan. + /// + /// This function will error for [`LogicalPlan`]s such as catalog + /// DDL `CREATE TABLE` must be handled by another layer. pub async fn create_physical_plan( &self, logical_plan: &LogicalPlan, @@ -2095,6 +2199,92 @@ impl SerializerRegistry for EmptySerializerRegistry { } } +/// Describes which SQL statements can be run. +/// +/// See [`SessionContext::sql_with_options`] for more details. +#[derive(Clone, Debug, Copy)] +pub struct SQLOptions { + /// See [`Self::with_allow_ddl`] + allow_ddl: bool, + /// See [`Self::with_allow_dml`] + allow_dml: bool, + /// See [`Self::with_allow_statements`] + allow_statements: bool, +} + +impl Default for SQLOptions { + fn default() -> Self { + Self { + allow_ddl: true, + allow_dml: true, + allow_statements: true, + } + } +} + +impl SQLOptions { + /// Create a new `SQLOptions` with default values + pub fn new() -> Self { + Default::default() + } + + /// Should DML data modification commands (e.g. `INSERT and COPY`) be run? Defaults to `true`. + pub fn with_allow_ddl(mut self, allow: bool) -> Self { + self.allow_ddl = allow; + self + } + + /// Should DML data modification commands (e.g. `INSERT and COPY`) be run? Defaults to `true` + pub fn with_allow_dml(mut self, allow: bool) -> Self { + self.allow_dml = allow; + self + } + + /// Should Statements such as (e.g. `SET VARIABLE and `BEGIN TRANSACTION` ...`) be run?. Defaults to `true` + pub fn with_allow_statements(mut self, allow: bool) -> Self { + self.allow_statements = allow; + self + } + + /// Return an error if the [`LogicalPlan`] has any nodes that are + /// incompatible with this [`SQLOptions`]. + pub fn verify_plan(&self, plan: &LogicalPlan) -> Result<()> { + plan.visit(&mut BadPlanVisitor::new(self))?; + Ok(()) + } +} + +struct BadPlanVisitor<'a> { + options: &'a SQLOptions, +} +impl<'a> BadPlanVisitor<'a> { + fn new(options: &'a SQLOptions) -> Self { + Self { options } + } +} + +impl<'a> TreeNodeVisitor for BadPlanVisitor<'a> { + type N = LogicalPlan; + + fn pre_visit(&mut self, node: &Self::N) -> Result { + match node { + LogicalPlan::Ddl(ddl) if !self.options.allow_ddl => { + plan_err!("DDL not supported: {}", ddl.name()) + } + LogicalPlan::Dml(dml) if !self.options.allow_dml => { + plan_err!("DML not supported: {}", dml.op) + } + LogicalPlan::Copy(_) if !self.options.allow_dml => { + plan_err!("DML not supported: COPY") + } + LogicalPlan::Statement(stmt) if !self.options.allow_statements => { + plan_err!("Statement not supported: {}", stmt.name()) + } + _ => Ok(VisitRecursion::Continue), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -2646,43 +2836,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn unsupported_sql_returns_error() -> Result<()> { - let ctx = SessionContext::new(); - ctx.register_table("test", test::table_with_sequence(1, 1).unwrap()) - .unwrap(); - let state = ctx.state(); - - // create view - let sql = "create view test_view as select * from test"; - let plan = state.create_logical_plan(sql).await; - let physical_plan = state.create_physical_plan(&plan.unwrap()).await; - assert!(physical_plan.is_err()); - assert_eq!( - format!("{}", physical_plan.unwrap_err()), - "This feature is not implemented: Unsupported logical plan: CreateView" - ); - // // drop view - let sql = "drop view test_view"; - let plan = state.create_logical_plan(sql).await; - let physical_plan = state.create_physical_plan(&plan.unwrap()).await; - assert!(physical_plan.is_err()); - assert_eq!( - format!("{}", physical_plan.unwrap_err()), - "This feature is not implemented: Unsupported logical plan: DropView" - ); - // // drop table - let sql = "drop table test"; - let plan = state.create_logical_plan(sql).await; - let physical_plan = state.create_physical_plan(&plan.unwrap()).await; - assert!(physical_plan.is_err()); - assert_eq!( - format!("{}", physical_plan.unwrap_err()), - "This feature is not implemented: Unsupported logical plan: DropTable" - ); - Ok(()) - } - struct MyPhysicalPlanner {} #[async_trait] diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs index d01d9c2390d40..3782feca191a1 100644 --- a/datafusion/core/src/prelude.rs +++ b/datafusion/core/src/prelude.rs @@ -26,7 +26,7 @@ //! ``` pub use crate::dataframe::DataFrame; -pub use crate::execution::context::{SessionConfig, SessionContext}; +pub use crate::execution::context::{SQLOptions, SessionConfig, SessionContext}; pub use crate::execution::options::{ AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, }; diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index c1adcf9d0a966..35423234db88b 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -96,6 +96,7 @@ pub mod projection; pub mod references; pub mod repartition; pub mod select; +mod sql_api; pub mod subqueries; pub mod timestamp; pub mod udf; diff --git a/datafusion/core/tests/sql/sql_api.rs b/datafusion/core/tests/sql/sql_api.rs new file mode 100644 index 0000000000000..4f249a8656da6 --- /dev/null +++ b/datafusion/core/tests/sql/sql_api.rs @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::prelude::*; +use tempfile::TempDir; + +#[tokio::test] +async fn unsupported_ddl_returns_error() { + // Verify SessionContext::with_sql_options errors appropriately + let ctx = SessionContext::new(); + ctx.sql("CREATE TABLE test (x int)").await.unwrap(); + + // disallow ddl + let options = SQLOptions::new().with_allow_ddl(false); + + let sql = "create view test_view as select * from test"; + let df = ctx.sql_with_options(sql, options).await; + assert_eq!( + df.unwrap_err().to_string(), + "Error during planning: DDL not supported: CreateView" + ); + + // allow ddl + let options = options.with_allow_ddl(true); + ctx.sql_with_options(sql, options).await.unwrap(); +} + +#[tokio::test] +async fn unsupported_dml_returns_error() { + let ctx = SessionContext::new(); + ctx.sql("CREATE TABLE test (x int)").await.unwrap(); + + let options = SQLOptions::new().with_allow_dml(false); + + let sql = "insert into test values (1)"; + let df = ctx.sql_with_options(sql, options).await; + assert_eq!( + df.unwrap_err().to_string(), + "Error during planning: DML not supported: Insert Into" + ); + + let options = options.with_allow_dml(true); + ctx.sql_with_options(sql, options).await.unwrap(); +} + +#[tokio::test] +async fn unsupported_copy_returns_error() { + let tmpdir = TempDir::new().unwrap(); + let tmpfile = tmpdir.path().join("foo.parquet"); + + let ctx = SessionContext::new(); + ctx.sql("CREATE TABLE test (x int)").await.unwrap(); + + let options = SQLOptions::new().with_allow_dml(false); + + let sql = format!("copy (values(1)) to '{}'", tmpfile.to_string_lossy()); + let df = ctx.sql_with_options(&sql, options).await; + assert_eq!( + df.unwrap_err().to_string(), + "Error during planning: DML not supported: COPY" + ); + + let options = options.with_allow_dml(true); + ctx.sql_with_options(&sql, options).await.unwrap(); +} + +#[tokio::test] +async fn unsupported_statement_returns_error() { + let ctx = SessionContext::new(); + ctx.sql("CREATE TABLE test (x int)").await.unwrap(); + + let options = SQLOptions::new().with_allow_statements(false); + + let sql = "set datafusion.execution.batch_size = 5"; + let df = ctx.sql_with_options(sql, options).await; + assert_eq!( + df.unwrap_err().to_string(), + "Error during planning: Statement not supported: SetVariable" + ); + + let options = options.with_allow_statements(true); + ctx.sql_with_options(sql, options).await.unwrap(); +} + +#[tokio::test] +async fn ddl_can_not_be_planned_by_session_state() { + let ctx = SessionContext::new(); + + // make a table via SQL + ctx.sql("CREATE TABLE test (x int)").await.unwrap(); + + let state = ctx.state(); + + // can not create a logical plan for catalog DDL + let sql = "drop table test"; + let plan = state.create_logical_plan(sql).await.unwrap(); + let physical_plan = state.create_physical_plan(&plan).await; + assert_eq!( + physical_plan.unwrap_err().to_string(), + "This feature is not implemented: Unsupported logical plan: DropTable" + ); +}