diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index d124fd94f6590..ea9fa765e6a36 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -360,6 +360,11 @@ impl ConfigOptions { self.set(key, ScalarValue::UInt64(Some(value))) } + /// set a `String` configuration option + pub fn set_string(&mut self, key: &str, value: impl Into) { + self.set(key, ScalarValue::Utf8(Some(value.into()))) + } + /// get a configuration option pub fn get(&self, key: &str) -> Option { self.options.get(key).cloned() diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index c66199e073b71..d4962f87359c3 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -67,7 +67,7 @@ use crate::error::{DataFusionError, Result}; use crate::logical_expr::{ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CreateView, DropTable, DropView, Explain, LogicalPlan, LogicalPlanBuilder, - TableSource, TableType, UNNAMED_TABLE, + SetVariable, TableSource, TableType, UNNAMED_TABLE, }; use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule}; use datafusion_sql::{ResolvedTableReference, TableReference}; @@ -341,6 +341,60 @@ impl SessionContext { ))), } } + + LogicalPlan::SetVariable(SetVariable { + variable, value, .. + }) => { + let config_options = &self.state.write().config.config_options; + + let old_value = + config_options.read().get(&variable).ok_or_else(|| { + DataFusionError::Execution(format!( + "Can not SET variable: Unknown Variable {}", + variable + )) + })?; + + match old_value { + ScalarValue::Boolean(_) => { + let new_value = value.parse::().map_err(|_| { + DataFusionError::Execution(format!( + "Failed to parse {} as bool", + value, + )) + })?; + config_options.write().set_bool(&variable, new_value); + } + + ScalarValue::UInt64(_) => { + let new_value = value.parse::().map_err(|_| { + DataFusionError::Execution(format!( + "Failed to parse {} as u64", + value, + )) + })?; + config_options.write().set_u64(&variable, new_value); + } + + ScalarValue::Utf8(_) => { + let new_value = value.parse::().map_err(|_| { + DataFusionError::Execution(format!( + "Failed to parse {} as String", + value, + )) + })?; + config_options.write().set_string(&variable, new_value); + } + + _ => { + return Err(DataFusionError::Execution( + "Unsupported Scalar Value Type".to_string(), + )) + } + } + self.return_empty_dataframe() + } + LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema_name, if_not_exists, diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index a389bb65bdc7e..4d5980a45311d 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1097,6 +1097,11 @@ impl DefaultPhysicalPlanner { "Unsupported logical plan: CreateView".to_string(), )) } + LogicalPlan::SetVariable(_) => { + Err(DataFusionError::Internal( + "Unsupported logical plan: SetVariable must be root of the plan".to_string(), + )) + } LogicalPlan::Explain(_) => Err(DataFusionError::Internal( "Unsupported logical plan: Explain must be root of the plan".to_string(), )), diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index ede9fafa77212..1562574dcfbe0 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -109,6 +109,7 @@ pub mod idenfifers; pub mod information_schema; pub mod parquet_schema; pub mod partitioned_csv; +pub mod set_variable; pub mod subqueries; #[cfg(feature = "unicode_expressions")] pub mod unicode; diff --git a/datafusion/core/tests/sql/set_variable.rs b/datafusion/core/tests/sql/set_variable.rs new file mode 100644 index 0000000000000..2bbe49120bd09 --- /dev/null +++ b/datafusion/core/tests/sql/set_variable.rs @@ -0,0 +1,296 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::*; + +#[tokio::test] +async fn set_variable_to_value() { + let ctx = + SessionContext::with_config(SessionConfig::new().with_information_schema(true)); + + ctx.sql("SET datafusion.execution.batch_size to 1") + .await + .unwrap(); + let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size") + .await + .unwrap(); + let expected = vec![ + "+---------------------------------+---------+", + "| name | setting |", + "+---------------------------------+---------+", + "| datafusion.execution.batch_size | 1 |", + "+---------------------------------+---------+", + ]; + assert_batches_sorted_eq!(expected, &result); +} + +#[tokio::test] +async fn set_variable_to_value_with_equal_sign() { + let ctx = + SessionContext::with_config(SessionConfig::new().with_information_schema(true)); + + ctx.sql("SET datafusion.execution.batch_size = 1") + .await + .unwrap(); + let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size") + .await + .unwrap(); + let expected = vec![ + "+---------------------------------+---------+", + "| name | setting |", + "+---------------------------------+---------+", + "| datafusion.execution.batch_size | 1 |", + "+---------------------------------+---------+", + ]; + assert_batches_sorted_eq!(expected, &result); +} + +#[tokio::test] +async fn set_variable_to_value_with_single_quoted_string() { + let ctx = + SessionContext::with_config(SessionConfig::new().with_information_schema(true)); + + ctx.sql("SET datafusion.execution.batch_size to '1'") + .await + .unwrap(); + let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size") + .await + .unwrap(); + let expected = vec![ + "+---------------------------------+---------+", + "| name | setting |", + "+---------------------------------+---------+", + "| datafusion.execution.batch_size | 1 |", + "+---------------------------------+---------+", + ]; + assert_batches_sorted_eq!(expected, &result); +} + +#[tokio::test] +async fn set_variable_to_value_case_insensitive() { + let ctx = + SessionContext::with_config(SessionConfig::new().with_information_schema(true)); + + ctx.sql("SET datafusion.EXECUTION.batch_size to '1'") + .await + .unwrap(); + let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size") + .await + .unwrap(); + let expected = vec![ + "+---------------------------------+---------+", + "| name | setting |", + "+---------------------------------+---------+", + "| datafusion.execution.batch_size | 1 |", + "+---------------------------------+---------+", + ]; + assert_batches_sorted_eq!(expected, &result); +} + +#[tokio::test] +async fn set_variable_unknown_variable() { + let ctx = SessionContext::new(); + + let err = plan_and_collect(&ctx, "SET aabbcc to '1'") + .await + .unwrap_err(); + assert_eq!( + err.to_string(), + "Execution error: Can not SET variable: Unknown Variable aabbcc" + ); +} + +#[tokio::test] +async fn set_bool_variable() { + let ctx = + SessionContext::with_config(SessionConfig::new().with_information_schema(true)); + + ctx.sql("SET datafusion.execution.coalesce_batches to true") + .await + .unwrap(); + let result = plan_and_collect(&ctx, "SHOW datafusion.execution.coalesce_batches") + .await + .unwrap(); + let expected = vec![ + "+---------------------------------------+---------+", + "| name | setting |", + "+---------------------------------------+---------+", + "| datafusion.execution.coalesce_batches | true |", + "+---------------------------------------+---------+", + ]; + assert_batches_eq!(expected, &result); + + ctx.sql("SET datafusion.execution.coalesce_batches to 'false'") + .await + .unwrap(); + let result = plan_and_collect(&ctx, "SHOW datafusion.execution.coalesce_batches") + .await + .unwrap(); + let expected = vec![ + "+---------------------------------------+---------+", + "| name | setting |", + "+---------------------------------------+---------+", + "| datafusion.execution.coalesce_batches | false |", + "+---------------------------------------+---------+", + ]; + assert_batches_eq!(expected, &result); +} + +#[tokio::test] +async fn set_bool_variable_bad_value() { + let ctx = + SessionContext::with_config(SessionConfig::new().with_information_schema(true)); + + let err = plan_and_collect(&ctx, "SET datafusion.execution.coalesce_batches to 1") + .await + .unwrap_err(); + + assert_eq!( + err.to_string(), + "Execution error: Failed to parse 1 as bool" + ); + + let err = plan_and_collect(&ctx, "SET datafusion.execution.coalesce_batches to abc") + .await + .unwrap_err(); + + assert_eq!( + err.to_string(), + "Execution error: Failed to parse abc as bool" + ); +} + +#[tokio::test] +async fn set_u64_variable() { + let ctx = + SessionContext::with_config(SessionConfig::new().with_information_schema(true)); + + ctx.sql("SET datafusion.execution.batch_size to 0") + .await + .unwrap(); + let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size") + .await + .unwrap(); + let expected = vec![ + "+---------------------------------+---------+", + "| name | setting |", + "+---------------------------------+---------+", + "| datafusion.execution.batch_size | 0 |", + "+---------------------------------+---------+", + ]; + assert_batches_eq!(expected, &result); + + ctx.sql("SET datafusion.execution.batch_size to '1'") + .await + .unwrap(); + let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size") + .await + .unwrap(); + let expected = vec![ + "+---------------------------------+---------+", + "| name | setting |", + "+---------------------------------+---------+", + "| datafusion.execution.batch_size | 1 |", + "+---------------------------------+---------+", + ]; + assert_batches_eq!(expected, &result); + + ctx.sql("SET datafusion.execution.batch_size to +2") + .await + .unwrap(); + let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size") + .await + .unwrap(); + let expected = vec![ + "+---------------------------------+---------+", + "| name | setting |", + "+---------------------------------+---------+", + "| datafusion.execution.batch_size | 2 |", + "+---------------------------------+---------+", + ]; + assert_batches_eq!(expected, &result); +} + +#[tokio::test] +async fn set_u64_variable_bad_value() { + let ctx = + SessionContext::with_config(SessionConfig::new().with_information_schema(true)); + + let err = plan_and_collect(&ctx, "SET datafusion.execution.batch_size to -1") + .await + .unwrap_err(); + + assert_eq!( + err.to_string(), + "Execution error: Failed to parse -1 as u64" + ); + + let err = plan_and_collect(&ctx, "SET datafusion.execution.batch_size to abc") + .await + .unwrap_err(); + + assert_eq!( + err.to_string(), + "Execution error: Failed to parse abc as u64" + ); + + let err = plan_and_collect(&ctx, "SET datafusion.execution.batch_size to 0.1") + .await + .unwrap_err(); + + assert_eq!( + err.to_string(), + "Execution error: Failed to parse 0.1 as u64" + ); +} + +#[tokio::test] +async fn set_time_zone() { + // we don't support changing time zone for now until all time zone issues fixed and related function completed + + let ctx = SessionContext::new(); + + // for full variable name + let err = plan_and_collect(&ctx, "set datafusion.execution.time_zone = '8'") + .await + .unwrap_err(); + + assert_eq!( + err.to_string(), + "Error during planning: Changing Time Zone isn't supported yet" + ); + + // for alias time zone + let err = plan_and_collect(&ctx, "set time zone = '8'") + .await + .unwrap_err(); + + assert_eq!( + err.to_string(), + "Error during planning: Changing Time Zone isn't supported yet" + ); + + // for alias timezone + let err = plan_and_collect(&ctx, "set timezone = '8'") + .await + .unwrap_err(); + + assert_eq!( + err.to_string(), + "Error during planning: Changing Time Zone isn't supported yet" + ); +} diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 874a80713a673..45c281551d186 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -72,7 +72,7 @@ pub use logical_plan::{ CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, LogicalPlanBuilder, Partitioning, PlanType, PlanVisitor, Projection, - Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, + Repartition, SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values, Window, }; pub use nullif::SUPPORTED_NULLIF_TYPES; diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 8e4dfc0eb32ac..2cfe921e67b37 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -25,9 +25,9 @@ pub use plan::{ Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, - LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort, - StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, - Values, Window, + LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, + SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, + ToStringifiedPlan, Union, Values, Window, }; pub use display::display_schema; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ce169f6ec253d..27586d702f99e 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -106,6 +106,8 @@ pub enum LogicalPlan { Extension(Extension), /// Remove duplicate rows from the input Distinct(Distinct), + /// Set a Varaible + SetVariable(SetVariable), } impl LogicalPlan { @@ -144,6 +146,7 @@ impl LogicalPlan { LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => schema, LogicalPlan::DropTable(DropTable { schema, .. }) => schema, LogicalPlan::DropView(DropView { schema, .. }) => schema, + LogicalPlan::SetVariable(SetVariable { schema, .. }) => schema, } } @@ -200,7 +203,9 @@ impl LogicalPlan { | LogicalPlan::CreateView(CreateView { input, .. }) | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(), LogicalPlan::Distinct(Distinct { input, .. }) => input.all_schemas(), - LogicalPlan::DropTable(_) | LogicalPlan::DropView(_) => vec![], + LogicalPlan::DropTable(_) + | LogicalPlan::DropView(_) + | LogicalPlan::SetVariable(_) => vec![], } } @@ -260,6 +265,7 @@ impl LogicalPlan { | LogicalPlan::CreateCatalogSchema(_) | LogicalPlan::CreateCatalog(_) | LogicalPlan::DropTable(_) + | LogicalPlan::SetVariable(_) | LogicalPlan::DropView(_) | LogicalPlan::CrossJoin(_) | LogicalPlan::Analyze { .. } @@ -305,6 +311,7 @@ impl LogicalPlan { | LogicalPlan::CreateCatalogSchema(_) | LogicalPlan::CreateCatalog(_) | LogicalPlan::DropTable(_) + | LogicalPlan::SetVariable(_) | LogicalPlan::DropView(_) => vec![], } } @@ -455,6 +462,7 @@ impl LogicalPlan { | LogicalPlan::CreateCatalogSchema(_) | LogicalPlan::CreateCatalog(_) | LogicalPlan::DropTable(_) + | LogicalPlan::SetVariable(_) | LogicalPlan::DropView(_) => true, }; if !recurse { @@ -939,6 +947,11 @@ impl LogicalPlan { }) => { write!(f, "DropView: {:?} if not exist:={}", name, if_exists) } + LogicalPlan::SetVariable(SetVariable { + variable, value, .. + }) => { + write!(f, "SetVariable: set {:?} to {:?}", variable, value) + } LogicalPlan::Distinct(Distinct { .. }) => { write!(f, "Distinct:") } @@ -1055,6 +1068,17 @@ pub struct DropView { pub schema: DFSchemaRef, } +/// Set a Variable -- value in [`ConfigOptions`] +#[derive(Clone)] +pub struct SetVariable { + /// The variable name + pub variable: String, + /// The value to set + pub value: String, + /// Dummy schema + pub schema: DFSchemaRef, +} + /// Produces no rows: An empty relation with an empty schema #[derive(Clone)] pub struct EmptyRelation { diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index be4c45f8c9310..47939b73348df 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -584,6 +584,7 @@ pub fn from_plan( | LogicalPlan::CreateExternalTable(_) | LogicalPlan::DropTable(_) | LogicalPlan::DropView(_) + | LogicalPlan::SetVariable(_) | LogicalPlan::CreateCatalogSchema(_) | LogicalPlan::CreateCatalog(_) => { // All of these plan types have no inputs / exprs so should not be called diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index daa164aec2698..6ab1dd1fc9ac5 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -226,6 +226,7 @@ fn optimize( | LogicalPlan::CreateCatalog(_) | LogicalPlan::DropTable(_) | LogicalPlan::DropView(_) + | LogicalPlan::SetVariable(_) | LogicalPlan::Distinct(_) | LogicalPlan::Extension { .. } => { // apply the optimization to all inputs of the plan diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index b2e776821fb15..9bd1ef4433b60 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -485,6 +485,7 @@ fn optimize_plan( | LogicalPlan::CreateCatalog(_) | LogicalPlan::DropTable(_) | LogicalPlan::DropView(_) + | LogicalPlan::SetVariable(_) | LogicalPlan::CrossJoin(_) | LogicalPlan::Distinct(_) | LogicalPlan::Extension { .. } => { diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index 13aaceb793558..552045044e602 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -1351,6 +1351,9 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlan::DropView(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for DropView", )), + LogicalPlan::SetVariable(_) => Err(proto_error( + "LogicalPlan serde is not yet implemented for DropView", + )), } } } diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 51798cc792393..0ea08a3d50ef4 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -27,7 +27,7 @@ use datafusion_expr::logical_plan::{ Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView, DropTable, DropView, Explain, JoinType, LogicalPlan, LogicalPlanBuilder, - Partitioning, PlanType, ToStringifiedPlan, + Partitioning, PlanType, SetVariable, ToStringifiedPlan, }; use datafusion_expr::utils::{ can_hash, expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, @@ -161,6 +161,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } => self.explain_statement_to_plan(verbose, analyze, *statement), Statement::Query(query) => self.query_to_plan(*query, &mut HashMap::new()), Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable), + Statement::SetVariable { + local, + hivevar, + variable, + value, + } => self.set_variable_to_plan(local, hivevar, &variable, value), + Statement::CreateTable { query: Some(query), name, @@ -2451,6 +2458,84 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { self.statement_to_plan(rewrite.pop_front().unwrap()) } + fn set_variable_to_plan( + &self, + local: bool, + hivevar: bool, + variable: &ObjectName, + value: Vec, + ) -> Result { + if local { + return Err(DataFusionError::NotImplemented( + "LOCAL is not supported".to_string(), + )); + } + + if hivevar { + return Err(DataFusionError::NotImplemented( + "HIVEVAR is not supported".to_string(), + )); + } + + let variable = variable.to_string(); + let mut variable_lower = variable.to_lowercase(); + + if variable_lower == "timezone" || variable_lower == "time.zone" { + // we could introduce alias in OptionDefinition if this string matching thing grows + variable_lower = "datafusion.execution.time_zone".to_string(); + } + + // we don't support change time zone until we complete time zone related implementation + if variable_lower == "datafusion.execution.time_zone" { + return Err(DataFusionError::Plan( + "Changing Time Zone isn't supported yet".to_string(), + )); + } + + // parse value string from Expr + let value_string = match &value[0] { + SQLExpr::Identifier(i) => i.to_string(), + SQLExpr::Value(v) => match v { + Value::SingleQuotedString(s) => s.to_string(), + Value::Number(_, _) | Value::Boolean(_) => v.to_string(), + Value::DoubleQuotedString(_) + | Value::EscapedStringLiteral(_) + | Value::NationalStringLiteral(_) + | Value::HexStringLiteral(_) + | Value::Null + | Value::Placeholder(_) => { + return Err(DataFusionError::Plan(format!( + "Unspported Value {}", + value[0] + ))) + } + }, + // for capture signed number e.g. +8, -8 + SQLExpr::UnaryOp { op, expr } => match op { + UnaryOperator::Plus => format!("+{}", expr), + UnaryOperator::Minus => format!("-{}", expr), + _ => { + return Err(DataFusionError::Plan(format!( + "Unspported Value {}", + value[0] + ))) + } + }, + _ => { + return Err(DataFusionError::Plan(format!( + "Unspported Value {}", + value[0] + ))) + } + }; + + Ok(LogicalPlan::SetVariable(SetVariable { + variable: variable_lower, + value: value_string, + schema: DFSchemaRef::new(DFSchema::empty()), + })) + } + fn show_columns_to_plan( &self, extended: bool,