From fadef3fac4365ad8d3c04c785ba3faaf809173a2 Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sun, 28 Feb 2021 07:31:57 +0800 Subject: [PATCH 01/28] Make some functions public --- rust/datafusion/src/sql/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index f985b506536..03ba2a0c987 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -343,7 +343,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } /// Generate a logic plan from an SQL select - fn select_to_plan(&self, select: &Select) -> Result { + pub fn select_to_plan(&self, select: &Select) -> Result { let plans = self.plan_from_tables(&select.from)?; let plan = match &select.selection { From 97fb5cdc22e71842ec6dda1abf21d7640aa4f1b8 Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sun, 28 Feb 2021 14:39:31 +0800 Subject: [PATCH 02/28] Support case sensitive --- rust/datafusion/src/execution/context.rs | 13 ++++++ rust/datafusion/src/logical_plan/expr.rs | 48 ++++++++++++++------ rust/datafusion/src/optimizer/utils.rs | 12 +++-- rust/datafusion/src/physical_plan/planner.rs | 4 +- rust/datafusion/src/physical_plan/udaf.rs | 1 + rust/datafusion/src/physical_plan/udf.rs | 1 + rust/datafusion/src/sql/planner.rs | 25 ++++++++-- rust/datafusion/src/sql/utils.rs | 11 +++-- 8 files changed, 89 insertions(+), 26 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 850ce745c8c..55ee445e7f7 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -502,6 +502,8 @@ pub struct ExecutionConfig { pub concurrency: usize, /// Default batch size when reading data sources pub batch_size: usize, + /// Case sensitive + pub case_sensitive: bool, /// Responsible for optimizing a logical plan optimizers: Vec>, /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan` @@ -514,6 +516,7 @@ impl ExecutionConfig { Self { concurrency: num_cpus::get(), batch_size: 32768, + case_sensitive: true, optimizers: vec![ Arc::new(ConstantFolding::new()), Arc::new(ProjectionPushDown::new()), @@ -540,6 +543,12 @@ impl ExecutionConfig { self } + /// Customize case sensitive + pub fn with_case_sensitive(mut self, cs: bool) -> Self { + self.case_sensitive = cs; + self + } + /// Replace the default query planner pub fn with_query_planner( mut self, @@ -589,6 +598,10 @@ impl ContextProvider for ExecutionContextState { fn get_aggregate_meta(&self, name: &str) -> Option> { self.aggregate_functions.get(name).cloned() } + + fn get_config(&self) -> ExecutionConfig { + self.config.clone() + } } impl FunctionRegistry for ExecutionContextState { diff --git a/rust/datafusion/src/logical_plan/expr.rs b/rust/datafusion/src/logical_plan/expr.rs index 775ab64ac14..daeb0f1a5df 100644 --- a/rust/datafusion/src/logical_plan/expr.rs +++ b/rust/datafusion/src/logical_plan/expr.rs @@ -160,6 +160,8 @@ pub enum Expr { }, /// Represents the call of a built-in scalar function with a set of arguments. ScalarFunction { + /// The input name of the function + input_name: String, /// The function fun: functions::BuiltinScalarFunction, /// List of expressions to feed to the functions as arguments @@ -167,6 +169,8 @@ pub enum Expr { }, /// Represents the call of a user-defined scalar function with arguments. ScalarUDF { + /// The input name of the function + input_name: String, /// The function fun: Arc, /// List of expressions to feed to the functions as arguments @@ -174,6 +178,8 @@ pub enum Expr { }, /// Represents the call of an aggregate built-in function with arguments. AggregateFunction { + /// The input name of the function + input_name: String, /// Name of the function fun: aggregates::AggregateFunction, /// List of expressions to feed to the functions as arguments @@ -183,6 +189,8 @@ pub enum Expr { }, /// aggregate function AggregateUDF { + /// The input name of the function + input_name: String, /// The function fun: Arc, /// List of expressions to feed to the functions as arguments @@ -220,21 +228,21 @@ impl Expr { Expr::Literal(l) => Ok(l.get_datatype()), Expr::Case { when_then_expr, .. } => when_then_expr[0].1.get_type(schema), Expr::Cast { data_type, .. } => Ok(data_type.clone()), - Expr::ScalarUDF { fun, args } => { + Expr::ScalarUDF { fun, args, .. } => { let data_types = args .iter() .map(|e| e.get_type(schema)) .collect::>>()?; Ok((fun.return_type)(&data_types)?.as_ref().clone()) } - Expr::ScalarFunction { fun, args } => { + Expr::ScalarFunction {fun, args, .. } => { let data_types = args .iter() .map(|e| e.get_type(schema)) .collect::>>()?; functions::return_type(fun, &data_types) } - Expr::AggregateFunction { fun, args, .. } => { + Expr::AggregateFunction { input_name, fun, args, .. } => { let data_types = args .iter() .map(|e| e.get_type(schema)) @@ -668,24 +676,29 @@ impl Expr { asc, nulls_first, }, - Expr::ScalarFunction { args, fun } => Expr::ScalarFunction { + Expr::ScalarFunction { input_name, args, fun} => Expr::ScalarFunction { + input_name, args: rewrite_vec(args, rewriter)?, fun, }, - Expr::ScalarUDF { args, fun } => Expr::ScalarUDF { + Expr::ScalarUDF { input_name, args, fun, .. } => Expr::ScalarUDF { + input_name, args: rewrite_vec(args, rewriter)?, fun, }, Expr::AggregateFunction { + input_name, args, fun, distinct, } => Expr::AggregateFunction { + input_name, args: rewrite_vec(args, rewriter)?, fun, distinct, }, - Expr::AggregateUDF { args, fun } => Expr::AggregateUDF { + Expr::AggregateUDF { input_name, args, fun} => Expr::AggregateUDF { + input_name, args: rewrite_vec(args, rewriter)?, fun, }, @@ -916,6 +929,7 @@ pub fn col(name: &str) -> Expr { /// Create an expression to represent the min() aggregate function pub fn min(expr: Expr) -> Expr { Expr::AggregateFunction { + input_name: aggregates::AggregateFunction::Min.to_string(), fun: aggregates::AggregateFunction::Min, distinct: false, args: vec![expr], @@ -925,6 +939,7 @@ pub fn min(expr: Expr) -> Expr { /// Create an expression to represent the max() aggregate function pub fn max(expr: Expr) -> Expr { Expr::AggregateFunction { + input_name: aggregates::AggregateFunction::Max.to_string(), fun: aggregates::AggregateFunction::Max, distinct: false, args: vec![expr], @@ -934,6 +949,7 @@ pub fn max(expr: Expr) -> Expr { /// Create an expression to represent the sum() aggregate function pub fn sum(expr: Expr) -> Expr { Expr::AggregateFunction { + input_name: aggregates::AggregateFunction::Sum.to_string(), fun: aggregates::AggregateFunction::Sum, distinct: false, args: vec![expr], @@ -943,6 +959,7 @@ pub fn sum(expr: Expr) -> Expr { /// Create an expression to represent the avg() aggregate function pub fn avg(expr: Expr) -> Expr { Expr::AggregateFunction { + input_name: aggregates::AggregateFunction::Avg.to_string(), fun: aggregates::AggregateFunction::Avg, distinct: false, args: vec![expr], @@ -952,6 +969,7 @@ pub fn avg(expr: Expr) -> Expr { /// Create an expression to represent the count() aggregate function pub fn count(expr: Expr) -> Expr { Expr::AggregateFunction { + input_name: aggregates::AggregateFunction::Count.to_string(), fun: aggregates::AggregateFunction::Count, distinct: false, args: vec![expr], @@ -961,6 +979,7 @@ pub fn count(expr: Expr) -> Expr { /// Create an expression to represent the count(distinct) aggregate function pub fn count_distinct(expr: Expr) -> Expr { Expr::AggregateFunction { + input_name: aggregates::AggregateFunction::Count.to_string(), fun: aggregates::AggregateFunction::Count, distinct: true, args: vec![expr], @@ -1034,6 +1053,7 @@ macro_rules! unary_scalar_expr { #[allow(missing_docs)] pub fn $FUNC(e: Expr) -> Expr { Expr::ScalarFunction { + input_name: functions::BuiltinScalarFunction::$ENUM.to_string(), fun: functions::BuiltinScalarFunction::$ENUM, args: vec![e], } @@ -1085,6 +1105,7 @@ unary_scalar_expr!(Upper, upper); /// returns an array of fixed size with each argument on it. pub fn array(args: Vec) -> Expr { Expr::ScalarFunction { + input_name: functions::BuiltinScalarFunction::Array.to_string(), fun: functions::BuiltinScalarFunction::Array, args, } @@ -1302,24 +1323,25 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { let expr = create_name(expr, input_schema)?; Ok(format!("{} IS NOT NULL", expr)) } - Expr::ScalarFunction { fun, args, .. } => { - create_function_name(&fun.to_string(), false, args, input_schema) + Expr::ScalarFunction { input_name, fun, args, .. } => { + create_function_name(input_name, false, args, input_schema) } - Expr::ScalarUDF { fun, args, .. } => { - create_function_name(&fun.name, false, args, input_schema) + Expr::ScalarUDF { input_name, fun, args, .. } => { + create_function_name(input_name, false, args, input_schema) } Expr::AggregateFunction { + input_name, fun, distinct, args, .. - } => create_function_name(&fun.to_string(), *distinct, args, input_schema), - Expr::AggregateUDF { fun, args } => { + } => create_function_name(input_name, *distinct, args, input_schema), + Expr::AggregateUDF { input_name, fun, args } => { let mut names = Vec::with_capacity(args.len()); for e in args { names.push(create_name(e, input_schema)?); } - Ok(format!("{}({})", fun.name, names.join(","))) + Ok(format!("{}({})", input_name, names.join(","))) } Expr::InList { expr, diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs index c2552d77d2d..b59bf3bc988 100644 --- a/rust/datafusion/src/optimizer/utils.rs +++ b/rust/datafusion/src/optimizer/utils.rs @@ -295,20 +295,24 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result { }), Expr::IsNull(_) => Ok(Expr::IsNull(Box::new(expressions[0].clone()))), Expr::IsNotNull(_) => Ok(Expr::IsNotNull(Box::new(expressions[0].clone()))), - Expr::ScalarFunction { fun, .. } => Ok(Expr::ScalarFunction { + Expr::ScalarFunction { input_name, fun, .. } => Ok(Expr::ScalarFunction { + input_name: input_name.to_string(), fun: fun.clone(), args: expressions.to_vec(), }), - Expr::ScalarUDF { fun, .. } => Ok(Expr::ScalarUDF { + Expr::ScalarUDF { input_name, fun, .. } => Ok(Expr::ScalarUDF { + input_name: input_name.to_string(), fun: fun.clone(), args: expressions.to_vec(), }), - Expr::AggregateFunction { fun, distinct, .. } => Ok(Expr::AggregateFunction { + Expr::AggregateFunction { input_name, fun, distinct, .. } => Ok(Expr::AggregateFunction { + input_name: input_name.to_string(), fun: fun.clone(), args: expressions.to_vec(), distinct: *distinct, }), - Expr::AggregateUDF { fun, .. } => Ok(Expr::AggregateUDF { + Expr::AggregateUDF { input_name, fun, .. } => Ok(Expr::AggregateUDF { + input_name: input_name.to_string(), fun: fun.clone(), args: expressions.to_vec(), }), diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index c83b639c232..0bd9487bf3a 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -553,14 +553,14 @@ impl DefaultPhysicalPlanner { Expr::IsNotNull(expr) => expressions::is_not_null( self.create_physical_expr(expr, input_schema, ctx_state)?, ), - Expr::ScalarFunction { fun, args } => { + Expr::ScalarFunction { fun, args, .. } => { let physical_args = args .iter() .map(|e| self.create_physical_expr(e, input_schema, ctx_state)) .collect::>>()?; functions::create_physical_expr(fun, &physical_args, input_schema) } - Expr::ScalarUDF { fun, args } => { + Expr::ScalarUDF { fun, args, .. } => { let mut physical_args = vec![]; for e in args { physical_args.push(self.create_physical_expr( diff --git a/rust/datafusion/src/physical_plan/udaf.rs b/rust/datafusion/src/physical_plan/udaf.rs index 3dc6aa402f5..aeaaf4605af 100644 --- a/rust/datafusion/src/physical_plan/udaf.rs +++ b/rust/datafusion/src/physical_plan/udaf.rs @@ -93,6 +93,7 @@ impl AggregateUDF { /// This utility allows using the UDAF without requiring access to the registry. pub fn call(&self, args: Vec) -> Expr { Expr::AggregateUDF { + input_name: self.name.to_string(), fun: Arc::new(self.clone()), args, } diff --git a/rust/datafusion/src/physical_plan/udf.rs b/rust/datafusion/src/physical_plan/udf.rs index 9189da47bd6..96085bc4ab6 100644 --- a/rust/datafusion/src/physical_plan/udf.rs +++ b/rust/datafusion/src/physical_plan/udf.rs @@ -82,6 +82,7 @@ impl ScalarUDF { /// This utility allows using the UDF without requiring access to the registry. pub fn call(&self, args: Vec) -> Expr { Expr::ScalarUDF { + input_name: self.name.to_string(), fun: Arc::new(self.clone()), args, } diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 03ba2a0c987..3e1bdae0228 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -36,6 +36,7 @@ use crate::{ physical_plan::{aggregates, functions}, sql::parser::{CreateExternalTable, FileType, Statement as DFStatement}, }; +use crate::execution::context::ExecutionConfig; use arrow::datatypes::*; @@ -66,6 +67,8 @@ pub trait ContextProvider { fn get_function_meta(&self, name: &str) -> Option>; /// Getter for a UDAF description fn get_aggregate_meta(&self, name: &str) -> Option>; + /// Getter for a config + fn get_config(&self) -> ExecutionConfig; } /// SQL query planner @@ -727,6 +730,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { SQLExpr::Value(Value::Null) => Ok(Expr::Literal(ScalarValue::Utf8(None))), SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction { + input_name: functions::BuiltinScalarFunction::DatePart.to_string(), fun: functions::BuiltinScalarFunction::DatePart, args: vec![ Expr::Literal(ScalarValue::Utf8(Some(format!("{}", field)))), @@ -927,7 +931,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } SQLExpr::Function(function) => { - let name: String = function.name.to_string(); + let name; + let input_name = function.name.to_string(); + + let case_sensitive = self.schema_provider.get_config().case_sensitive; + if case_sensitive == false { + name = input_name.to_lowercase(); + } else { + name = input_name.to_string(); + } // first, scalar built-in if let Ok(fun) = functions::BuiltinScalarFunction::from_str(&name) { @@ -937,7 +949,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|a| self.sql_fn_arg_to_logical_expr(a)) .collect::>>()?; - return Ok(Expr::ScalarFunction { fun, args }); + return Ok(Expr::ScalarFunction { input_name, fun, args }); }; // next, aggregate built-ins @@ -964,6 +976,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; return Ok(Expr::AggregateFunction { + input_name, fun, distinct: function.distinct, args, @@ -979,7 +992,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|a| self.sql_fn_arg_to_logical_expr(a)) .collect::>>()?; - Ok(Expr::ScalarUDF { fun: fm, args }) + Ok(Expr::ScalarUDF { input_name, fun: fm, args }) } None => match self.schema_provider.get_aggregate_meta(&name) { Some(fm) => { @@ -989,7 +1002,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|a| self.sql_fn_arg_to_logical_expr(a)) .collect::>>()?; - Ok(Expr::AggregateUDF { fun: fm, args }) + Ok(Expr::AggregateUDF { input_name, fun: fm, args }) } _ => Err(DataFusionError::Plan(format!( "Invalid function '{}'", @@ -2452,5 +2465,9 @@ mod tests { fn get_aggregate_meta(&self, _name: &str) -> Option> { unimplemented!() } + + fn get_config(&self) -> ExecutionConfig { + unimplemented!() + } } } diff --git a/rust/datafusion/src/sql/utils.rs b/rust/datafusion/src/sql/utils.rs index ffcb772cd0e..7bb5e6a7991 100644 --- a/rust/datafusion/src/sql/utils.rs +++ b/rust/datafusion/src/sql/utils.rs @@ -206,10 +206,12 @@ where // clone_with_replacement() on any nested expressions. None => match expr { Expr::AggregateFunction { + input_name, fun, args, distinct, } => Ok(Expr::AggregateFunction { + input_name: input_name.to_string(), fun: fun.clone(), args: args .iter() @@ -217,7 +219,8 @@ where .collect::>>()?, distinct: *distinct, }), - Expr::AggregateUDF { fun, args } => Ok(Expr::AggregateUDF { + Expr::AggregateUDF { input_name, fun, args } => Ok(Expr::AggregateUDF { + input_name: input_name.to_string(), fun: fun.clone(), args: args .iter() @@ -285,14 +288,16 @@ where None => None, }, }), - Expr::ScalarFunction { fun, args } => Ok(Expr::ScalarFunction { + Expr::ScalarFunction { input_name, fun, args } => Ok(Expr::ScalarFunction { + input_name: input_name.clone(), fun: fun.clone(), args: args .iter() .map(|e| clone_with_replacement(e, replacement_fn)) .collect::>>()?, }), - Expr::ScalarUDF { fun, args } => Ok(Expr::ScalarUDF { + Expr::ScalarUDF { input_name, fun, args } => Ok(Expr::ScalarUDF { + input_name: input_name.clone(), fun: fun.clone(), args: args .iter() From 122ab74957ae67393fa995eed38a94f7a11e0141 Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sun, 28 Feb 2021 16:14:28 +0800 Subject: [PATCH 03/28] remove to_string() --- rust/datafusion/src/logical_plan/expr.rs | 2 +- rust/datafusion/src/optimizer/utils.rs | 6 +++--- rust/datafusion/src/sql/planner.rs | 2 +- rust/datafusion/src/sql/utils.rs | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rust/datafusion/src/logical_plan/expr.rs b/rust/datafusion/src/logical_plan/expr.rs index daeb0f1a5df..4e3022b805d 100644 --- a/rust/datafusion/src/logical_plan/expr.rs +++ b/rust/datafusion/src/logical_plan/expr.rs @@ -242,7 +242,7 @@ impl Expr { .collect::>>()?; functions::return_type(fun, &data_types) } - Expr::AggregateFunction { input_name, fun, args, .. } => { + Expr::AggregateFunction { fun, args, .. } => { let data_types = args .iter() .map(|e| e.get_type(schema)) diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs index b59bf3bc988..c61c70e4bf3 100644 --- a/rust/datafusion/src/optimizer/utils.rs +++ b/rust/datafusion/src/optimizer/utils.rs @@ -301,18 +301,18 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result { args: expressions.to_vec(), }), Expr::ScalarUDF { input_name, fun, .. } => Ok(Expr::ScalarUDF { - input_name: input_name.to_string(), + input_name: input_name.clone(), fun: fun.clone(), args: expressions.to_vec(), }), Expr::AggregateFunction { input_name, fun, distinct, .. } => Ok(Expr::AggregateFunction { - input_name: input_name.to_string(), + input_name: input_name.clone(), fun: fun.clone(), args: expressions.to_vec(), distinct: *distinct, }), Expr::AggregateUDF { input_name, fun, .. } => Ok(Expr::AggregateUDF { - input_name: input_name.to_string(), + input_name: input_name.clone(), fun: fun.clone(), args: expressions.to_vec(), }), diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 3e1bdae0228..61cabc6719a 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -938,7 +938,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { if case_sensitive == false { name = input_name.to_lowercase(); } else { - name = input_name.to_string(); + name = input_name.clone(); } // first, scalar built-in diff --git a/rust/datafusion/src/sql/utils.rs b/rust/datafusion/src/sql/utils.rs index 7bb5e6a7991..d3dad139404 100644 --- a/rust/datafusion/src/sql/utils.rs +++ b/rust/datafusion/src/sql/utils.rs @@ -220,7 +220,7 @@ where distinct: *distinct, }), Expr::AggregateUDF { input_name, fun, args } => Ok(Expr::AggregateUDF { - input_name: input_name.to_string(), + input_name: input_name.clone(), fun: fun.clone(), args: args .iter() From 6793160e756fb0311bf34ab5ebf17ae8d2d8ae68 Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sun, 28 Feb 2021 16:37:35 +0800 Subject: [PATCH 04/28] remove unused variable fun --- rust/datafusion/src/logical_plan/expr.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rust/datafusion/src/logical_plan/expr.rs b/rust/datafusion/src/logical_plan/expr.rs index 4e3022b805d..8e6be2f0c3e 100644 --- a/rust/datafusion/src/logical_plan/expr.rs +++ b/rust/datafusion/src/logical_plan/expr.rs @@ -1323,20 +1323,19 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { let expr = create_name(expr, input_schema)?; Ok(format!("{} IS NOT NULL", expr)) } - Expr::ScalarFunction { input_name, fun, args, .. } => { + Expr::ScalarFunction { input_name, args, .. } => { create_function_name(input_name, false, args, input_schema) } - Expr::ScalarUDF { input_name, fun, args, .. } => { + Expr::ScalarUDF { input_name, args, .. } => { create_function_name(input_name, false, args, input_schema) } Expr::AggregateFunction { input_name, - fun, distinct, args, .. } => create_function_name(input_name, *distinct, args, input_schema), - Expr::AggregateUDF { input_name, fun, args } => { + Expr::AggregateUDF { input_name, args , ..} => { let mut names = Vec::with_capacity(args.len()); for e in args { names.push(create_name(e, input_schema)?); From 33a1a9e5a3c24a20916b66720bf6e7f3919c1945 Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sun, 28 Feb 2021 16:49:08 +0800 Subject: [PATCH 05/28] equality checks against false can be replaced by a negation --- rust/datafusion/src/sql/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 61cabc6719a..607c5073828 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -935,7 +935,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let input_name = function.name.to_string(); let case_sensitive = self.schema_provider.get_config().case_sensitive; - if case_sensitive == false { + if !case_sensitive { name = input_name.to_lowercase(); } else { name = input_name.clone(); From 33a1b9572b57349c6c0c5172baa1027e2c336f6c Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sun, 28 Feb 2021 17:08:00 +0800 Subject: [PATCH 06/28] format code --- rust/datafusion/src/logical_plan/expr.rs | 34 +++++++++++++++++++----- rust/datafusion/src/optimizer/utils.rs | 16 ++++++++--- rust/datafusion/src/sql/planner.rs | 16 ++++++++--- rust/datafusion/src/sql/utils.rs | 12 +++++++-- 4 files changed, 62 insertions(+), 16 deletions(-) diff --git a/rust/datafusion/src/logical_plan/expr.rs b/rust/datafusion/src/logical_plan/expr.rs index 8e6be2f0c3e..cae1d6b1e65 100644 --- a/rust/datafusion/src/logical_plan/expr.rs +++ b/rust/datafusion/src/logical_plan/expr.rs @@ -235,7 +235,7 @@ impl Expr { .collect::>>()?; Ok((fun.return_type)(&data_types)?.as_ref().clone()) } - Expr::ScalarFunction {fun, args, .. } => { + Expr::ScalarFunction { fun, args, .. } => { let data_types = args .iter() .map(|e| e.get_type(schema)) @@ -676,12 +676,21 @@ impl Expr { asc, nulls_first, }, - Expr::ScalarFunction { input_name, args, fun} => Expr::ScalarFunction { + Expr::ScalarFunction { + input_name, + args, + fun + } => Expr::ScalarFunction { input_name, args: rewrite_vec(args, rewriter)?, fun, }, - Expr::ScalarUDF { input_name, args, fun, .. } => Expr::ScalarUDF { + Expr::ScalarUDF { + input_name, + args, + fun, + .. + } => Expr::ScalarUDF { input_name, args: rewrite_vec(args, rewriter)?, fun, @@ -697,7 +706,11 @@ impl Expr { fun, distinct, }, - Expr::AggregateUDF { input_name, args, fun} => Expr::AggregateUDF { + Expr::AggregateUDF { + input_name, + args, + fun + } => Expr::AggregateUDF { input_name, args: rewrite_vec(args, rewriter)?, fun, @@ -1323,10 +1336,15 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { let expr = create_name(expr, input_schema)?; Ok(format!("{} IS NOT NULL", expr)) } - Expr::ScalarFunction { input_name, args, .. } => { + Expr::ScalarFunction { + input_name, + args, .. + } => { create_function_name(input_name, false, args, input_schema) } - Expr::ScalarUDF { input_name, args, .. } => { + Expr::ScalarUDF { + input_name, args, .. + } => { create_function_name(input_name, false, args, input_schema) } Expr::AggregateFunction { @@ -1335,7 +1353,9 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { args, .. } => create_function_name(input_name, *distinct, args, input_schema), - Expr::AggregateUDF { input_name, args , ..} => { + Expr::AggregateUDF { + input_name, args , .. + } => { let mut names = Vec::with_capacity(args.len()); for e in args { names.push(create_name(e, input_schema)?); diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs index c61c70e4bf3..bef7d8f8c08 100644 --- a/rust/datafusion/src/optimizer/utils.rs +++ b/rust/datafusion/src/optimizer/utils.rs @@ -295,23 +295,31 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result { }), Expr::IsNull(_) => Ok(Expr::IsNull(Box::new(expressions[0].clone()))), Expr::IsNotNull(_) => Ok(Expr::IsNotNull(Box::new(expressions[0].clone()))), - Expr::ScalarFunction { input_name, fun, .. } => Ok(Expr::ScalarFunction { + Expr::ScalarFunction { + input_name, fun, .. + } => Ok(Expr::ScalarFunction { input_name: input_name.to_string(), fun: fun.clone(), args: expressions.to_vec(), }), - Expr::ScalarUDF { input_name, fun, .. } => Ok(Expr::ScalarUDF { + Expr::ScalarUDF { + input_name, fun, .. + } => Ok(Expr::ScalarUDF { input_name: input_name.clone(), fun: fun.clone(), args: expressions.to_vec(), }), - Expr::AggregateFunction { input_name, fun, distinct, .. } => Ok(Expr::AggregateFunction { + Expr::AggregateFunction { + input_name, fun, distinct, .. + } => Ok(Expr::AggregateFunction { input_name: input_name.clone(), fun: fun.clone(), args: expressions.to_vec(), distinct: *distinct, }), - Expr::AggregateUDF { input_name, fun, .. } => Ok(Expr::AggregateUDF { + Expr::AggregateUDF { + input_name, fun, .. + } => Ok(Expr::AggregateUDF { input_name: input_name.clone(), fun: fun.clone(), args: expressions.to_vec(), diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 607c5073828..72dfddb5bb7 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -949,7 +949,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|a| self.sql_fn_arg_to_logical_expr(a)) .collect::>>()?; - return Ok(Expr::ScalarFunction { input_name, fun, args }); + return Ok(Expr::ScalarFunction { + input_name, fun, args, + }); }; // next, aggregate built-ins @@ -992,7 +994,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|a| self.sql_fn_arg_to_logical_expr(a)) .collect::>>()?; - Ok(Expr::ScalarUDF { input_name, fun: fm, args }) + Ok(Expr::ScalarUDF { + input_name, + fun: fm, + args, + }) } None => match self.schema_provider.get_aggregate_meta(&name) { Some(fm) => { @@ -1002,7 +1008,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|a| self.sql_fn_arg_to_logical_expr(a)) .collect::>>()?; - Ok(Expr::AggregateUDF { input_name, fun: fm, args }) + Ok(Expr::AggregateUDF { + input_name, + fun: fm, + args, + }) } _ => Err(DataFusionError::Plan(format!( "Invalid function '{}'", diff --git a/rust/datafusion/src/sql/utils.rs b/rust/datafusion/src/sql/utils.rs index d3dad139404..b27436cd135 100644 --- a/rust/datafusion/src/sql/utils.rs +++ b/rust/datafusion/src/sql/utils.rs @@ -288,7 +288,11 @@ where None => None, }, }), - Expr::ScalarFunction { input_name, fun, args } => Ok(Expr::ScalarFunction { + Expr::ScalarFunction { + input_name, + fun, + args, + } => Ok(Expr::ScalarFunction { input_name: input_name.clone(), fun: fun.clone(), args: args @@ -296,7 +300,11 @@ where .map(|e| clone_with_replacement(e, replacement_fn)) .collect::>>()?, }), - Expr::ScalarUDF { input_name, fun, args } => Ok(Expr::ScalarUDF { + Expr::ScalarUDF { + input_name, + fun, + args, + } => Ok(Expr::ScalarUDF { input_name: input_name.clone(), fun: fun.clone(), args: args From ea8ca2aefc648595177491f949a6f14eed8ed4e2 Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sun, 28 Feb 2021 17:20:12 +0800 Subject: [PATCH 07/28] format code --- rust/datafusion/src/logical_plan/expr.rs | 17 ++++++----------- rust/datafusion/src/optimizer/utils.rs | 5 ++++- rust/datafusion/src/sql/planner.rs | 4 +++- rust/datafusion/src/sql/utils.rs | 6 +++++- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/rust/datafusion/src/logical_plan/expr.rs b/rust/datafusion/src/logical_plan/expr.rs index cae1d6b1e65..0f379eb34c8 100644 --- a/rust/datafusion/src/logical_plan/expr.rs +++ b/rust/datafusion/src/logical_plan/expr.rs @@ -679,7 +679,7 @@ impl Expr { Expr::ScalarFunction { input_name, args, - fun + fun, } => Expr::ScalarFunction { input_name, args: rewrite_vec(args, rewriter)?, @@ -709,7 +709,7 @@ impl Expr { Expr::AggregateUDF { input_name, args, - fun + fun, } => Expr::AggregateUDF { input_name, args: rewrite_vec(args, rewriter)?, @@ -1337,16 +1337,11 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { Ok(format!("{} IS NOT NULL", expr)) } Expr::ScalarFunction { - input_name, - args, .. - } => { - create_function_name(input_name, false, args, input_schema) - } + input_name, args, .. + } => create_function_name(input_name, false, args, input_schema), Expr::ScalarUDF { input_name, args, .. - } => { - create_function_name(input_name, false, args, input_schema) - } + } => create_function_name(input_name, false, args, input_schema), Expr::AggregateFunction { input_name, distinct, @@ -1354,7 +1349,7 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { .. } => create_function_name(input_name, *distinct, args, input_schema), Expr::AggregateUDF { - input_name, args , .. + input_name, args, .. } => { let mut names = Vec::with_capacity(args.len()); for e in args { diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs index bef7d8f8c08..cb2a8a83f89 100644 --- a/rust/datafusion/src/optimizer/utils.rs +++ b/rust/datafusion/src/optimizer/utils.rs @@ -310,7 +310,10 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result { args: expressions.to_vec(), }), Expr::AggregateFunction { - input_name, fun, distinct, .. + input_name, + fun, + distinct, + .. } => Ok(Expr::AggregateFunction { input_name: input_name.clone(), fun: fun.clone(), diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 72dfddb5bb7..446dd8ea0c6 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -950,7 +950,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .collect::>>()?; return Ok(Expr::ScalarFunction { - input_name, fun, args, + input_name, + fun, + args, }); }; diff --git a/rust/datafusion/src/sql/utils.rs b/rust/datafusion/src/sql/utils.rs index b27436cd135..36beecfdd21 100644 --- a/rust/datafusion/src/sql/utils.rs +++ b/rust/datafusion/src/sql/utils.rs @@ -219,7 +219,11 @@ where .collect::>>()?, distinct: *distinct, }), - Expr::AggregateUDF { input_name, fun, args } => Ok(Expr::AggregateUDF { + Expr::AggregateUDF { + input_name, + fun, + args, + } => Ok(Expr::AggregateUDF { input_name: input_name.clone(), fun: fun.clone(), args: args From a2e5eac9e92ec822f5eed2168e8a61e36d5f8e71 Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sun, 28 Feb 2021 17:37:42 +0800 Subject: [PATCH 08/28] format code --- rust/datafusion/src/sql/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 446dd8ea0c6..94fc8722ddc 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -21,6 +21,7 @@ use std::str::FromStr; use std::sync::Arc; use crate::datasource::TableProvider; +use crate::execution::context::ExecutionConfig; use crate::logical_plan::Expr::Alias; use crate::logical_plan::{ and, lit, DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, @@ -36,7 +37,6 @@ use crate::{ physical_plan::{aggregates, functions}, sql::parser::{CreateExternalTable, FileType, Statement as DFStatement}, }; -use crate::execution::context::ExecutionConfig; use arrow::datatypes::*; From f89797085e1782f1e706461c06e28142736df860 Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sun, 28 Feb 2021 21:19:16 +0800 Subject: [PATCH 09/28] fix test --- rust/arrow-flight/src/arrow.flight.protocol.rs | 5 +++-- rust/datafusion/src/execution/context.rs | 4 ++-- rust/datafusion/src/sql/planner.rs | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/rust/arrow-flight/src/arrow.flight.protocol.rs b/rust/arrow-flight/src/arrow.flight.protocol.rs index 2a87d7b2683..8de767cb8a4 100644 --- a/rust/arrow-flight/src/arrow.flight.protocol.rs +++ b/rust/arrow-flight/src/arrow.flight.protocol.rs @@ -499,8 +499,9 @@ pub mod flight_service_server { #[async_trait] pub trait FlightService: Send + Sync + 'static { #[doc = "Server streaming response type for the Handshake method."] - type HandshakeStream: Stream> - + Send + type HandshakeStream: Stream< + Item = Result, + > + Send + Sync + 'static; #[doc = ""] diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 55ee445e7f7..e8b35f9fc0e 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -1372,7 +1372,7 @@ mod tests { let results = plan_and_collect( &mut ctx, - "SELECT dict, count(val) FROM t GROUP BY dict", + "SELECT dict, COUNT(val) FROM t GROUP BY dict", ) .await .expect("ran plan correctly"); @@ -1390,7 +1390,7 @@ mod tests { // Now, use dict as an aggregate let results = - plan_and_collect(&mut ctx, "SELECT val, count(dict) FROM t GROUP BY val") + plan_and_collect(&mut ctx, "SELECT val, COUNT(dict) FROM t GROUP BY val") .await .expect("ran plan correctly"); diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 94fc8722ddc..57469cb9e96 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -2479,7 +2479,7 @@ mod tests { } fn get_config(&self) -> ExecutionConfig { - unimplemented!() + ExecutionConfig::new() } } } From b135fe8d62b5f4db460f5ad4d91764d11500b305 Mon Sep 17 00:00:00 2001 From: wqc200 Date: Tue, 2 Mar 2021 20:16:40 +0800 Subject: [PATCH 10/28] Update rust/datafusion/src/sql/planner.rs Co-authored-by: Andrew Lamb --- rust/datafusion/src/sql/planner.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 57469cb9e96..5555db77efa 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -931,14 +931,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } SQLExpr::Function(function) => { - let name; let input_name = function.name.to_string(); - let case_sensitive = self.schema_provider.get_config().case_sensitive; - if !case_sensitive { - name = input_name.to_lowercase(); + let name = if !case_sensitive { + input_name.to_lowercase(); } else { - name = input_name.clone(); + input_name; } // first, scalar built-in From 209243964dc676948eeb5d601c0b81d460d9c473 Mon Sep 17 00:00:00 2001 From: wqc200 Date: Tue, 2 Mar 2021 20:18:26 +0800 Subject: [PATCH 11/28] Update rust/datafusion/src/execution/context.rs Co-authored-by: Andrew Lamb --- rust/datafusion/src/execution/context.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index e8b35f9fc0e..7669fb0014f 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -599,8 +599,8 @@ impl ContextProvider for ExecutionContextState { self.aggregate_functions.get(name).cloned() } - fn get_config(&self) -> ExecutionConfig { - self.config.clone() + fn config(&self) -> &ExecutionConfig { + self.config } } From fb1a41d1f8d05280b9badfbd9a76a9dfbba502f8 Mon Sep 17 00:00:00 2001 From: wqc200 Date: Tue, 2 Mar 2021 20:23:12 +0800 Subject: [PATCH 12/28] Update rust/datafusion/src/execution/context.rs Co-authored-by: Andrew Lamb --- rust/datafusion/src/execution/context.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 7669fb0014f..3b4f218abee 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -502,7 +502,11 @@ pub struct ExecutionConfig { pub concurrency: usize, /// Default batch size when reading data sources pub batch_size: usize, - /// Case sensitive + /// Will function names be searched using case-sensitive matching. + /// If `false` both `"SELECT COUNT(*) FROM t;` and "`SELECT count(*) FROM t;` + /// can be used to compute the `COUNT` aggregate. If `true` then only + /// `"SELECT count(*) FROM t"` can be used. + /// Defaults to `true` pub case_sensitive: bool, /// Responsible for optimizing a logical plan optimizers: Vec>, From 474cb6106841a6f62962712a17a67816d7e9dae1 Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Tue, 2 Mar 2021 20:34:06 +0800 Subject: [PATCH 13/28] remove get_config to config --- rust/arrow-flight/src/arrow.flight.protocol.rs | 5 ++--- rust/datafusion/src/execution/context.rs | 2 +- rust/datafusion/src/sql/planner.rs | 7 ++++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/rust/arrow-flight/src/arrow.flight.protocol.rs b/rust/arrow-flight/src/arrow.flight.protocol.rs index 8de767cb8a4..2a87d7b2683 100644 --- a/rust/arrow-flight/src/arrow.flight.protocol.rs +++ b/rust/arrow-flight/src/arrow.flight.protocol.rs @@ -499,9 +499,8 @@ pub mod flight_service_server { #[async_trait] pub trait FlightService: Send + Sync + 'static { #[doc = "Server streaming response type for the Handshake method."] - type HandshakeStream: Stream< - Item = Result, - > + Send + type HandshakeStream: Stream> + + Send + Sync + 'static; #[doc = ""] diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 7669fb0014f..cda307a2834 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -600,7 +600,7 @@ impl ContextProvider for ExecutionContextState { } fn config(&self) -> &ExecutionConfig { - self.config + &self.config } } diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 5555db77efa..c93ba843b00 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -68,7 +68,7 @@ pub trait ContextProvider { /// Getter for a UDAF description fn get_aggregate_meta(&self, name: &str) -> Option>; /// Getter for a config - fn get_config(&self) -> ExecutionConfig; + fn config(&self) -> &ExecutionConfig; } /// SQL query planner @@ -2476,8 +2476,9 @@ mod tests { unimplemented!() } - fn get_config(&self) -> ExecutionConfig { - ExecutionConfig::new() + fn config(&self) -> &ExecutionConfig { + let ec = ExecutionConfig::new(); + &ec } } } From af1568f387c393ff95af9d538833af9d5c6cac4b Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Tue, 2 Mar 2021 22:25:15 +0800 Subject: [PATCH 14/28] fix bug about input_name --- rust/datafusion/src/sql/planner.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index c93ba843b00..239e5e77994 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -932,12 +932,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { SQLExpr::Function(function) => { let input_name = function.name.to_string(); - let case_sensitive = self.schema_provider.get_config().case_sensitive; + let case_sensitive = self.schema_provider.config().case_sensitive; let name = if !case_sensitive { - input_name.to_lowercase(); + input_name.to_lowercase() } else { - input_name; - } + input_name.clone() + }; // first, scalar built-in if let Ok(fun) = functions::BuiltinScalarFunction::from_str(&name) { From 8fa5b2931b96a9511a522d371858ae8b78637ced Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Wed, 3 Mar 2021 20:39:51 +0800 Subject: [PATCH 15/28] make config with arc --- rust/datafusion/src/execution/context.rs | 12 ++++++------ rust/datafusion/src/physical_plan/parquet.rs | 2 +- rust/datafusion/src/physical_plan/planner.rs | 2 +- rust/datafusion/src/sql/planner.rs | 6 +++--- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index a6672cfa700..77d3905b9b9 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -105,11 +105,11 @@ pub struct ExecutionContext { impl ExecutionContext { /// Creates a new execution context using a default configuration. pub fn new() -> Self { - Self::with_config(ExecutionConfig::new()) + Self::with_config(Arc::new(ExecutionConfig::new())) } /// Creates a new execution context using the provided configuration. - pub fn with_config(config: ExecutionConfig) -> Self { + pub fn with_config(config: Arc) -> Self { Self { state: Arc::new(Mutex::new(ExecutionContextState { datasources: HashMap::new(), @@ -584,7 +584,7 @@ pub struct ExecutionContextState { /// Aggregate functions registered in the context pub aggregate_functions: HashMap>, /// Context configuration - pub config: ExecutionConfig, + pub config: Arc, } impl ContextProvider for ExecutionContextState { @@ -603,8 +603,8 @@ impl ContextProvider for ExecutionContextState { self.aggregate_functions.get(name).cloned() } - fn config(&self) -> &ExecutionConfig { - &self.config + fn config(&self) -> Arc { + self.config.clone() } } @@ -1907,7 +1907,7 @@ mod tests { #[tokio::test] async fn custom_query_planner() -> Result<()> { let mut ctx = ExecutionContext::with_config( - ExecutionConfig::new().with_query_planner(Arc::new(MyQueryPlanner {})), + Arc::new(ExecutionConfig::new().with_query_planner(Arc::new(MyQueryPlanner {}))), ); let df = ctx.sql("SELECT 1")?; diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs index 6ab26c2c9e0..22540003ee2 100644 --- a/rust/datafusion/src/physical_plan/parquet.rs +++ b/rust/datafusion/src/physical_plan/parquet.rs @@ -322,7 +322,7 @@ impl RowGroupPredicateBuilder { scalar_functions: HashMap::new(), var_provider: HashMap::new(), aggregate_functions: HashMap::new(), - config: ExecutionConfig::new(), + config: Arc::new(ExecutionConfig::new()), }; let predicate_expr = DefaultPhysicalPlanner::default().create_physical_expr( &logical_predicate_expr, diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index 0bd9487bf3a..2a2404e88d3 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -758,7 +758,7 @@ mod tests { scalar_functions: HashMap::new(), var_provider: HashMap::new(), aggregate_functions: HashMap::new(), - config: ExecutionConfig::new(), + config: Arc::new(ExecutionConfig::new()), } } diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 239e5e77994..3d18cbd0306 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -68,7 +68,7 @@ pub trait ContextProvider { /// Getter for a UDAF description fn get_aggregate_meta(&self, name: &str) -> Option>; /// Getter for a config - fn config(&self) -> &ExecutionConfig; + fn config(&self) -> Arc; } /// SQL query planner @@ -2476,9 +2476,9 @@ mod tests { unimplemented!() } - fn config(&self) -> &ExecutionConfig { + fn config(&self) -> Arc { let ec = ExecutionConfig::new(); - &ec + Arc::new(ec) } } } From 890237868a1cb21951c5cbe882670627c15572bf Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Thu, 4 Mar 2021 22:36:40 +0800 Subject: [PATCH 16/28] add arc with config --- rust/benchmarks/src/bin/tpch.rs | 6 +++--- rust/datafusion/src/bin/repl.rs | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs index f0f2e7cf57c..d86ad3c25ff 100644 --- a/rust/benchmarks/src/bin/tpch.rs +++ b/rust/benchmarks/src/bin/tpch.rs @@ -131,7 +131,7 @@ async fn benchmark(opt: BenchmarkOpt) -> Result Result<()> { .file_extension(".tbl"); let config = ExecutionConfig::new().with_batch_size(opt.batch_size); - let mut ctx = ExecutionContext::with_config(config); + let mut ctx = ExecutionContext::with_config(Arc::new(config)); // build plan to read the TBL file let mut csv = ctx.read_csv(&input_path, options)?; @@ -1602,7 +1602,7 @@ mod tests { let config = ExecutionConfig::new() .with_concurrency(1) .with_batch_size(10); - let mut ctx = ExecutionContext::with_config(config); + let mut ctx = ExecutionContext::with_config(Arc::new(config)); for &table in TABLES { let schema = get_schema(table); diff --git a/rust/datafusion/src/bin/repl.rs b/rust/datafusion/src/bin/repl.rs index 00e42615b90..4c39d04250d 100644 --- a/rust/datafusion/src/bin/repl.rs +++ b/rust/datafusion/src/bin/repl.rs @@ -24,6 +24,7 @@ use datafusion::execution::context::{ExecutionConfig, ExecutionContext}; use rustyline::Editor; use std::env; use std::path::Path; +use std::sync::Arc; use std::time::Instant; #[tokio::main] @@ -62,7 +63,7 @@ pub async fn main() { .unwrap_or(1_048_576); let mut ctx = - ExecutionContext::with_config(ExecutionConfig::new().with_batch_size(batch_size)); + ExecutionContext::with_config(Arc::new(ExecutionConfig::new().with_batch_size(batch_size))); let mut rl = Editor::<()>::new(); rl.load_history(".history").ok(); From 6578bfb15281e1295c45679c98892bb648f52096 Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sat, 6 Mar 2021 08:03:01 +0800 Subject: [PATCH 17/28] add arc with config --- rust/arrow/src/array/builder.rs | 2 ++ rust/benchmarks/src/bin/nyctaxi.rs | 3 ++- rust/datafusion/src/bin/repl.rs | 4 +++- rust/datafusion/src/execution/context.rs | 10 +++++----- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/rust/arrow/src/array/builder.rs b/rust/arrow/src/array/builder.rs index 32eea9c9c7a..d770586d672 100644 --- a/rust/arrow/src/array/builder.rs +++ b/rust/arrow/src/array/builder.rs @@ -2693,6 +2693,8 @@ mod tests { let binary_array = builder.finish(); + let a = binary_array.data(); + assert_eq!(3, binary_array.len()); assert_eq!(0, binary_array.null_count()); assert_eq!([b'h', b'e', b'l', b'l', b'o'], binary_array.value(0)); diff --git a/rust/benchmarks/src/bin/nyctaxi.rs b/rust/benchmarks/src/bin/nyctaxi.rs index 3391c38c8d3..fc95b035cdb 100644 --- a/rust/benchmarks/src/bin/nyctaxi.rs +++ b/rust/benchmarks/src/bin/nyctaxi.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::path::PathBuf; use std::process; +use std::sync::Arc; use std::time::Instant; use arrow::datatypes::{DataType, Field, Schema}; @@ -71,7 +72,7 @@ async fn main() -> Result<()> { let config = ExecutionConfig::new() .with_concurrency(opt.concurrency) .with_batch_size(opt.batch_size); - let mut ctx = ExecutionContext::with_config(config); + let mut ctx = ExecutionContext::with_config(Arc::new(config)); let path = opt.path.to_str().unwrap(); diff --git a/rust/datafusion/src/bin/repl.rs b/rust/datafusion/src/bin/repl.rs index 4c39d04250d..8ae50480297 100644 --- a/rust/datafusion/src/bin/repl.rs +++ b/rust/datafusion/src/bin/repl.rs @@ -63,7 +63,9 @@ pub async fn main() { .unwrap_or(1_048_576); let mut ctx = - ExecutionContext::with_config(Arc::new(ExecutionConfig::new().with_batch_size(batch_size))); + ExecutionContext::with_config(Arc::new( + ExecutionConfig::new().with_batch_size(batch_size), + )); let mut rl = Editor::<()>::new(); rl.load_history(".history").ok(); diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 77d3905b9b9..f574a68801b 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -503,9 +503,9 @@ pub struct ExecutionConfig { /// Default batch size when reading data sources pub batch_size: usize, /// Will function names be searched using case-sensitive matching. - /// If `false` both `"SELECT COUNT(*) FROM t;` and "`SELECT count(*) FROM t;` + /// If `false` both `"SELECT COUNT(*) FROM t;` and "`SELECT count(*) FROM t;` /// can be used to compute the `COUNT` aggregate. If `true` then only - /// `"SELECT count(*) FROM t"` can be used. + /// `"SELECT count(*) FROM t"` can be used. /// Defaults to `true` pub case_sensitive: bool, /// Responsible for optimizing a logical plan @@ -1906,9 +1906,9 @@ mod tests { #[tokio::test] async fn custom_query_planner() -> Result<()> { - let mut ctx = ExecutionContext::with_config( - Arc::new(ExecutionConfig::new().with_query_planner(Arc::new(MyQueryPlanner {}))), - ); + let mut ctx = ExecutionContext::with_config(Arc::new( + ExecutionConfig::new().with_query_planner(Arc::new(MyQueryPlanner {})), + )); let df = ctx.sql("SELECT 1")?; df.collect().await.expect_err("query not supported"); From 7fe6ae8e4de80531a14423b0be9b5124f46da4dc Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sat, 6 Mar 2021 08:09:13 +0800 Subject: [PATCH 18/28] add arc with config --- rust/datafusion/src/bin/repl.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rust/datafusion/src/bin/repl.rs b/rust/datafusion/src/bin/repl.rs index 8ae50480297..b6bdaa71376 100644 --- a/rust/datafusion/src/bin/repl.rs +++ b/rust/datafusion/src/bin/repl.rs @@ -62,10 +62,9 @@ pub async fn main() { .map(|size| size.parse::().unwrap()) .unwrap_or(1_048_576); - let mut ctx = - ExecutionContext::with_config(Arc::new( - ExecutionConfig::new().with_batch_size(batch_size), - )); + let mut ctx = ExecutionContext::with_config(Arc::new( + ExecutionConfig::new().with_batch_size(batch_size), + )); let mut rl = Editor::<()>::new(); rl.load_history(".history").ok(); From 72fb896cc55dcff7bb0e800518dc5edf50a907ce Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sat, 6 Mar 2021 08:28:33 +0800 Subject: [PATCH 19/28] add arc with config --- rust/arrow/src/array/builder.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust/arrow/src/array/builder.rs b/rust/arrow/src/array/builder.rs index d770586d672..32eea9c9c7a 100644 --- a/rust/arrow/src/array/builder.rs +++ b/rust/arrow/src/array/builder.rs @@ -2693,8 +2693,6 @@ mod tests { let binary_array = builder.finish(); - let a = binary_array.data(); - assert_eq!(3, binary_array.len()); assert_eq!(0, binary_array.null_count()); assert_eq!([b'h', b'e', b'l', b'l', b'o'], binary_array.value(0)); From bb10ef308b029eec2eaef2233264f2308d1fc77e Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sat, 6 Mar 2021 11:09:31 +0800 Subject: [PATCH 20/28] rollback for arc --- rust/benchmarks/src/bin/nyctaxi.rs | 3 +-- rust/benchmarks/src/bin/tpch.rs | 6 +++--- rust/datafusion/src/bin/repl.rs | 6 ++---- rust/datafusion/src/execution/context.rs | 12 ++++++------ rust/datafusion/src/physical_plan/merge.rs | 2 +- rust/datafusion/src/physical_plan/parquet.rs | 2 +- rust/datafusion/src/physical_plan/planner.rs | 2 +- rust/datafusion/src/sql/planner.rs | 2 +- 8 files changed, 16 insertions(+), 19 deletions(-) diff --git a/rust/benchmarks/src/bin/nyctaxi.rs b/rust/benchmarks/src/bin/nyctaxi.rs index fc95b035cdb..3391c38c8d3 100644 --- a/rust/benchmarks/src/bin/nyctaxi.rs +++ b/rust/benchmarks/src/bin/nyctaxi.rs @@ -20,7 +20,6 @@ use std::collections::HashMap; use std::path::PathBuf; use std::process; -use std::sync::Arc; use std::time::Instant; use arrow::datatypes::{DataType, Field, Schema}; @@ -72,7 +71,7 @@ async fn main() -> Result<()> { let config = ExecutionConfig::new() .with_concurrency(opt.concurrency) .with_batch_size(opt.batch_size); - let mut ctx = ExecutionContext::with_config(Arc::new(config)); + let mut ctx = ExecutionContext::with_config(config); let path = opt.path.to_str().unwrap(); diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs index 83808eaf7e1..b0a6cedd172 100644 --- a/rust/benchmarks/src/bin/tpch.rs +++ b/rust/benchmarks/src/bin/tpch.rs @@ -135,7 +135,7 @@ async fn benchmark(opt: BenchmarkOpt) -> Result Result<()> { .file_extension(".tbl"); let config = ExecutionConfig::new().with_batch_size(opt.batch_size); - let mut ctx = ExecutionContext::with_config(Arc::new(config)); + let mut ctx = ExecutionContext::with_config(config); // build plan to read the TBL file let mut csv = ctx.read_csv(&input_path, options)?; @@ -1606,7 +1606,7 @@ mod tests { let config = ExecutionConfig::new() .with_concurrency(1) .with_batch_size(10); - let mut ctx = ExecutionContext::with_config(Arc::new(config)); + let mut ctx = ExecutionContext::with_config(config); for &table in TABLES { let schema = get_schema(table); diff --git a/rust/datafusion/src/bin/repl.rs b/rust/datafusion/src/bin/repl.rs index b6bdaa71376..00e42615b90 100644 --- a/rust/datafusion/src/bin/repl.rs +++ b/rust/datafusion/src/bin/repl.rs @@ -24,7 +24,6 @@ use datafusion::execution::context::{ExecutionConfig, ExecutionContext}; use rustyline::Editor; use std::env; use std::path::Path; -use std::sync::Arc; use std::time::Instant; #[tokio::main] @@ -62,9 +61,8 @@ pub async fn main() { .map(|size| size.parse::().unwrap()) .unwrap_or(1_048_576); - let mut ctx = ExecutionContext::with_config(Arc::new( - ExecutionConfig::new().with_batch_size(batch_size), - )); + let mut ctx = + ExecutionContext::with_config(ExecutionConfig::new().with_batch_size(batch_size)); let mut rl = Editor::<()>::new(); rl.load_history(".history").ok(); diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index f574a68801b..54d6ad09826 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -105,11 +105,11 @@ pub struct ExecutionContext { impl ExecutionContext { /// Creates a new execution context using a default configuration. pub fn new() -> Self { - Self::with_config(Arc::new(ExecutionConfig::new())) + Self::with_config(ExecutionConfig::new()) } /// Creates a new execution context using the provided configuration. - pub fn with_config(config: Arc) -> Self { + pub fn with_config(config: ExecutionConfig) -> Self { Self { state: Arc::new(Mutex::new(ExecutionContextState { datasources: HashMap::new(), @@ -584,7 +584,7 @@ pub struct ExecutionContextState { /// Aggregate functions registered in the context pub aggregate_functions: HashMap>, /// Context configuration - pub config: Arc, + pub config: ExecutionConfig, } impl ContextProvider for ExecutionContextState { @@ -603,7 +603,7 @@ impl ContextProvider for ExecutionContextState { self.aggregate_functions.get(name).cloned() } - fn config(&self) -> Arc { + fn config(&self) -> ExecutionConfig { self.config.clone() } } @@ -1906,9 +1906,9 @@ mod tests { #[tokio::test] async fn custom_query_planner() -> Result<()> { - let mut ctx = ExecutionContext::with_config(Arc::new( + let mut ctx = ExecutionContext::with_config( ExecutionConfig::new().with_query_planner(Arc::new(MyQueryPlanner {})), - )); + ); let df = ctx.sql("SELECT 1")?; df.collect().await.expect_err("query not supported"); diff --git a/rust/datafusion/src/physical_plan/merge.rs b/rust/datafusion/src/physical_plan/merge.rs index 51541b6c564..0bfd11bdaa5 100644 --- a/rust/datafusion/src/physical_plan/merge.rs +++ b/rust/datafusion/src/physical_plan/merge.rs @@ -121,7 +121,7 @@ impl ExecutionPlan for MergeExec { // spawn independent tasks whose resulting streams (of batches) // are sent to the channel for consumption. - for part_i in (0..input_partitions) { + for part_i in 0..input_partitions { let input = self.input.clone(); let mut sender = sender.clone(); tokio::spawn(async move { diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs index f6fe6767a1d..348a924040a 100644 --- a/rust/datafusion/src/physical_plan/parquet.rs +++ b/rust/datafusion/src/physical_plan/parquet.rs @@ -324,7 +324,7 @@ impl RowGroupPredicateBuilder { scalar_functions: HashMap::new(), var_provider: HashMap::new(), aggregate_functions: HashMap::new(), - config: Arc::new(ExecutionConfig::new()), + config: ExecutionConfig::new(), }; let predicate_expr = DefaultPhysicalPlanner::default().create_physical_expr( &logical_predicate_expr, diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index 2a2404e88d3..0bd9487bf3a 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -758,7 +758,7 @@ mod tests { scalar_functions: HashMap::new(), var_provider: HashMap::new(), aggregate_functions: HashMap::new(), - config: Arc::new(ExecutionConfig::new()), + config: ExecutionConfig::new(), } } diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 3d18cbd0306..b21930d441a 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -68,7 +68,7 @@ pub trait ContextProvider { /// Getter for a UDAF description fn get_aggregate_meta(&self, name: &str) -> Option>; /// Getter for a config - fn config(&self) -> Arc; + fn config(&self) -> ExecutionConfig; } /// SQL query planner From 2025c02d9338b3a2793620f16dd4a35ff7d3d114 Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sat, 6 Mar 2021 11:25:38 +0800 Subject: [PATCH 21/28] rollback for arc --- rust/datafusion/src/sql/planner.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index b21930d441a..540292dd927 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -2476,9 +2476,8 @@ mod tests { unimplemented!() } - fn config(&self) -> Arc { - let ec = ExecutionConfig::new(); - Arc::new(ec) + fn config(&self) -> ExecutionConfig { + ExecutionConfig::new() } } } From 49c10298899adae9b30bb0fac4e581bc8393836e Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Wed, 10 Mar 2021 00:38:07 +0800 Subject: [PATCH 22/28] The style of case display --- rust/datafusion/src/execution/context.rs | 20 ++++++++++++++++++-- rust/datafusion/src/sql/planner.rs | 14 ++++++++++++-- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 54d6ad09826..5bbf0a98bc2 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -495,6 +495,13 @@ impl QueryPlanner for DefaultQueryPlanner { } } +/// The style of case display +#[derive(Clone, PartialEq)] +pub enum CaseStyle { + LikeMySQL, + LikePostgreSQL +} + /// Configuration options for execution context #[derive(Clone)] pub struct ExecutionConfig { @@ -508,6 +515,8 @@ pub struct ExecutionConfig { /// `"SELECT count(*) FROM t"` can be used. /// Defaults to `true` pub case_sensitive: bool, + /// Default to like PostgreSQL + pub case_style: CaseStyle, /// Responsible for optimizing a logical plan optimizers: Vec>, /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan` @@ -521,6 +530,7 @@ impl ExecutionConfig { concurrency: num_cpus::get(), batch_size: 32768, case_sensitive: true, + case_style: CaseStyle::LikePostgreSQL, optimizers: vec![ Arc::new(ConstantFolding::new()), Arc::new(ProjectionPushDown::new()), @@ -553,6 +563,12 @@ impl ExecutionConfig { self } + /// Customize case style + pub fn with_case_style(mut self, cs: CaseStyle) -> Self { + self.case_style = cs; + self + } + /// Replace the default query planner pub fn with_query_planner( mut self, @@ -1376,7 +1392,7 @@ mod tests { let results = plan_and_collect( &mut ctx, - "SELECT dict, COUNT(val) FROM t GROUP BY dict", + "SELECT dict, count(val) FROM t GROUP BY dict", ) .await .expect("ran plan correctly"); @@ -1394,7 +1410,7 @@ mod tests { // Now, use dict as an aggregate let results = - plan_and_collect(&mut ctx, "SELECT val, COUNT(dict) FROM t GROUP BY val") + plan_and_collect(&mut ctx, "SELECT val, count(dict) FROM t GROUP BY val") .await .expect("ran plan correctly"); diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 540292dd927..83e32555d14 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -21,7 +21,7 @@ use std::str::FromStr; use std::sync::Arc; use crate::datasource::TableProvider; -use crate::execution::context::ExecutionConfig; +use crate::execution::context::{ExecutionConfig, CaseStyle}; use crate::logical_plan::Expr::Alias; use crate::logical_plan::{ and, lit, DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, @@ -931,8 +931,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } SQLExpr::Function(function) => { - let input_name = function.name.to_string(); let case_sensitive = self.schema_provider.config().case_sensitive; + let case_style = self.schema_provider.config().case_style; + + let mut input_name = function.name.to_string(); let name = if !case_sensitive { input_name.to_lowercase() } else { @@ -947,6 +949,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|a| self.sql_fn_arg_to_logical_expr(a)) .collect::>>()?; + if let CaseStyle::LikePostgreSQL = case_style { + input_name = input_name.to_lowercase(); + } + return Ok(Expr::ScalarFunction { input_name, fun, @@ -977,6 +983,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .collect::>>()? }; + if let CaseStyle::LikePostgreSQL = case_style { + input_name = input_name.to_uppercase(); + } + return Ok(Expr::AggregateFunction { input_name, fun, From 942cad66a7636e133baca3246c7a49c830fbbae7 Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Wed, 10 Mar 2021 07:18:53 +0800 Subject: [PATCH 23/28] add documentation for case style enum --- rust/datafusion/src/execution/context.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 5bbf0a98bc2..d77e9acfd4c 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -498,8 +498,10 @@ impl QueryPlanner for DefaultQueryPlanner { /// The style of case display #[derive(Clone, PartialEq)] pub enum CaseStyle { + /// Case style like MySQL, SELECT mD5("a") will output SELECT mD5("a") LikeMySQL, - LikePostgreSQL + /// Case style like PostgreSQL, SELECT mD5("a") will output SELECT md5("a") + LikePostgreSQL, } /// Configuration options for execution context From fc88704de3e85cbf76c6c3cdee6ba9986b1add6e Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Wed, 10 Mar 2021 07:44:52 +0800 Subject: [PATCH 24/28] format code --- rust/datafusion/src/sql/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 83e32555d14..e88a0b1d93a 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -21,7 +21,7 @@ use std::str::FromStr; use std::sync::Arc; use crate::datasource::TableProvider; -use crate::execution::context::{ExecutionConfig, CaseStyle}; +use crate::execution::context::{CaseStyle, ExecutionConfig}; use crate::logical_plan::Expr::Alias; use crate::logical_plan::{ and, lit, DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, From dba97ca86313a2dff8b4e32545e2fc3ed6fe327a Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sat, 13 Mar 2021 20:51:06 +0800 Subject: [PATCH 25/28] Resolve some conversation --- rust/datafusion/src/execution/context.rs | 14 +++++++++----- rust/datafusion/src/sql/planner.rs | 10 +++++----- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 3552a07e571..52c663784fc 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -501,9 +501,13 @@ impl QueryPlanner for DefaultQueryPlanner { /// The style of case display #[derive(Clone, PartialEq)] pub enum CaseStyle { - /// Case style like MySQL, SELECT mD5("a") will output SELECT mD5("a") - LikeMySQL, - /// Case style like PostgreSQL, SELECT mD5("a") will output SELECT md5("a") + /// function names in the output are displayed as they were provided: + /// 'mD5("a")' will appear in the results as `mD5("a")`. This + /// mimics MySQL behavior + PreserveCase, + /// function names in the output are displayed in lower case: + /// `mD5("a")` will appear in the results as `md5("a")`This + /// mimics PostgreSQL behavior LikePostgreSQL, } @@ -514,7 +518,7 @@ pub struct ExecutionConfig { pub concurrency: usize, /// Default batch size when reading data sources pub batch_size: usize, - /// Will function names be searched using case-sensitive matching. + /// Whether to use case-sensitive matching for function names. /// If `false` both `"SELECT COUNT(*) FROM t;` and "`SELECT count(*) FROM t;` /// can be used to compute the `COUNT` aggregate. If `true` then only /// `"SELECT count(*) FROM t"` can be used. @@ -625,7 +629,7 @@ impl ContextProvider for ExecutionContextState { self.aggregate_functions.get(name).cloned() } - fn config(&self) -> ExecutionConfig { + fn get_config(&self) -> ExecutionConfig { self.config.clone() } } diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index e88a0b1d93a..01afeede8c0 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -68,7 +68,7 @@ pub trait ContextProvider { /// Getter for a UDAF description fn get_aggregate_meta(&self, name: &str) -> Option>; /// Getter for a config - fn config(&self) -> ExecutionConfig; + fn get_config(&self) -> ExecutionConfig; } /// SQL query planner @@ -931,8 +931,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } SQLExpr::Function(function) => { - let case_sensitive = self.schema_provider.config().case_sensitive; - let case_style = self.schema_provider.config().case_style; + let case_sensitive = self.schema_provider.get_config().case_sensitive; + let case_style = self.schema_provider.get_config().case_style; let mut input_name = function.name.to_string(); let name = if !case_sensitive { @@ -984,7 +984,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; if let CaseStyle::LikePostgreSQL = case_style { - input_name = input_name.to_uppercase(); + input_name = input_name.to_lowercase(); } return Ok(Expr::AggregateFunction { @@ -2486,7 +2486,7 @@ mod tests { unimplemented!() } - fn config(&self) -> ExecutionConfig { + fn get_config(&self) -> ExecutionConfig { ExecutionConfig::new() } } From 2bc66d6eb5272d514fc45d44343f07f0a80875e4 Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sat, 13 Mar 2021 21:57:12 +0800 Subject: [PATCH 26/28] Set the input name to uppercase --- rust/datafusion/src/sql/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 01afeede8c0..ae0f6e4e97a 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -984,7 +984,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; if let CaseStyle::LikePostgreSQL = case_style { - input_name = input_name.to_lowercase(); + input_name = input_name.to_uppercase(); } return Ok(Expr::AggregateFunction { From 8c8fa501d24b5d05b87481f87a1a3469a6e18a08 Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sat, 13 Mar 2021 22:27:43 +0800 Subject: [PATCH 27/28] Set the input name to uppercase --- rust/datafusion/src/sql/planner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index ae0f6e4e97a..9fe4f788e4f 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -949,7 +949,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|a| self.sql_fn_arg_to_logical_expr(a)) .collect::>>()?; - if let CaseStyle::LikePostgreSQL = case_style { + if CaseStyle::LikePostgreSQL = case_style { input_name = input_name.to_lowercase(); } @@ -983,7 +983,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .collect::>>()? }; - if let CaseStyle::LikePostgreSQL = case_style { + if CaseStyle::LikePostgreSQL = case_style { input_name = input_name.to_uppercase(); } From ca63ae9e38aeb6d5409efeed79153fe4b1321569 Mon Sep 17 00:00:00 2001 From: wuqingcheng Date: Sat, 13 Mar 2021 22:29:25 +0800 Subject: [PATCH 28/28] Set the input name to uppercase --- rust/datafusion/src/sql/planner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 9fe4f788e4f..ec99bea48ae 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -949,7 +949,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|a| self.sql_fn_arg_to_logical_expr(a)) .collect::>>()?; - if CaseStyle::LikePostgreSQL = case_style { + if CaseStyle::LikePostgreSQL == case_style { input_name = input_name.to_lowercase(); } @@ -983,7 +983,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .collect::>>()? }; - if CaseStyle::LikePostgreSQL = case_style { + if CaseStyle::LikePostgreSQL == case_style { input_name = input_name.to_uppercase(); }