From ae0d3a3179c27bf127787db0e42d70f68754b761 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 29 Jan 2021 11:41:52 +0100 Subject: [PATCH 01/24] Start of EXTRACT support for DataFusion --- rust/datafusion/src/logical_plan/expr.rs | 31 ++++++++++++++++++++++++ rust/datafusion/src/logical_plan/mod.rs | 3 ++- rust/datafusion/src/optimizer/utils.rs | 30 +++++++++-------------- rust/datafusion/src/sql/planner.rs | 18 +++++++++++--- rust/datafusion/src/sql/utils.rs | 4 +++ 5 files changed, 63 insertions(+), 23 deletions(-) diff --git a/rust/datafusion/src/logical_plan/expr.rs b/rust/datafusion/src/logical_plan/expr.rs index 31898d3e695..9fd90975bf8 100644 --- a/rust/datafusion/src/logical_plan/expr.rs +++ b/rust/datafusion/src/logical_plan/expr.rs @@ -17,6 +17,7 @@ //! This module provides an `Expr` enum for representing expressions such as `col = 5` or `SUM(col)` +use super::temporal::DatePart; pub use super::Operator; use std::fmt; @@ -169,6 +170,13 @@ pub enum Expr { }, /// Represents a reference to all fields in a schema. Wildcard, + /// Extract date parts (day, hour, minute) from a date / time expression + Extract { + /// The date part, e.g. `DatePart::Hour` + date_part: DatePart, + /// The expression being cast + expr: Box, + }, } impl Expr { @@ -237,6 +245,21 @@ impl Expr { Expr::Wildcard => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), + Expr::Extract { + date_part: DatePart::Hour, + expr, + } => { + let inner = expr.get_type(schema)?; + match inner { + DataType::Date32(_) + | DataType::Date64(_) + | DataType::Time32(_) + | DataType::Time64(_) => Ok(DataType::Int32), + _ => Err(DataFusionError::Internal( + "Only Date and Time datatypes supported in Extract".to_owned(), + )), + } + } } } @@ -292,6 +315,7 @@ impl Expr { Expr::Wildcard => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), + Expr::Extract { date_part: _, expr } => expr.nullable(input_schema), } } @@ -524,6 +548,7 @@ impl Expr { .try_fold(visitor, |visitor, arg| arg.accept(visitor)) } Expr::Wildcard => Ok(visitor), + Expr::Extract { date_part: _, expr } => expr.accept(visitor), }?; visitor.post_visit(self) @@ -990,6 +1015,12 @@ impl fmt::Debug for Expr { } } Expr::Wildcard => write!(f, "*"), + Expr::Extract { + date_part: DatePart::Hour, + expr, + } => { + write!(f, "EXTRACT(HOUR FROM {:?})", expr) + } } } } diff --git a/rust/datafusion/src/logical_plan/mod.rs b/rust/datafusion/src/logical_plan/mod.rs index cceb94794b5..ce71929f356 100644 --- a/rust/datafusion/src/logical_plan/mod.rs +++ b/rust/datafusion/src/logical_plan/mod.rs @@ -29,7 +29,7 @@ mod extension; mod operators; mod plan; mod registry; - +mod temporal; pub use builder::LogicalPlanBuilder; pub use dfschema::{DFField, DFSchema, DFSchemaRef, ToDFSchema}; pub use display::display_schema; @@ -46,3 +46,4 @@ pub use plan::{ JoinType, LogicalPlan, Partitioning, PlanType, PlanVisitor, StringifiedPlan, }; pub use registry::FunctionRegistry; +pub use temporal::DatePart; diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs index 68143b1867f..62fd9d2f671 100644 --- a/rust/datafusion/src/optimizer/utils.rs +++ b/rust/datafusion/src/optimizer/utils.rs @@ -23,8 +23,7 @@ use arrow::datatypes::Schema; use super::optimizer::OptimizerRule; use crate::logical_plan::{ - Expr, LogicalPlan, Operator, Partitioning, PlanType, Recursion, StringifiedPlan, - ToDFSchema, + Expr, LogicalPlan, Operator, Partitioning, PlanType, Recursion, StringifiedPlan, ToDFSchema, }; use crate::prelude::{col, lit}; use crate::scalar::ScalarValue; @@ -38,10 +37,7 @@ const CASE_ELSE_MARKER: &str = "__DATAFUSION_CASE_ELSE__"; /// Recursively walk a list of expression trees, collecting the unique set of column /// names referenced in the expression -pub fn exprlist_to_column_names( - expr: &[Expr], - accum: &mut HashSet, -) -> Result<()> { +pub fn exprlist_to_column_names(expr: &[Expr], accum: &mut HashSet) -> Result<()> { for e in expr { expr_to_column_names(e, accum)?; } @@ -80,6 +76,7 @@ impl ExpressionVisitor for ColumnNameVisitor<'_> { Expr::AggregateUDF { .. } => {} Expr::InList { .. } => {} Expr::Wildcard => {} + Expr::Extract { .. } => {} } Ok(Recursion::Continue(self)) } @@ -140,9 +137,7 @@ pub fn expressions(plan: &LogicalPlan) -> Vec { result.extend(aggr_expr.clone()); result } - LogicalPlan::Join { on, .. } => { - on.iter().flat_map(|(l, r)| vec![col(l), col(r)]).collect() - } + LogicalPlan::Join { on, .. } => on.iter().flat_map(|(l, r)| vec![col(l), col(r)]).collect(), LogicalPlan::Sort { expr, .. } => expr.clone(), LogicalPlan::Extension { node } => node.expressions(), // plans without expressions @@ -299,6 +294,7 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result> { Expr::Wildcard { .. } => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), + Expr::Extract { date_part: _, expr } => Ok(vec![expr.as_ref().to_owned()]), } } @@ -339,15 +335,11 @@ pub fn rewrite_expression(expr: &Expr, expressions: &Vec) -> Result while i < expressions.len() { match &expressions[i] { - Expr::Literal(ScalarValue::Utf8(Some(str))) - if str == CASE_EXPR_MARKER => - { + Expr::Literal(ScalarValue::Utf8(Some(str))) if str == CASE_EXPR_MARKER => { base_expr = Some(Box::new(expressions[i + 1].clone())); i += 2; } - Expr::Literal(ScalarValue::Utf8(Some(str))) - if str == CASE_ELSE_MARKER => - { + Expr::Literal(ScalarValue::Utf8(Some(str))) if str == CASE_ELSE_MARKER => { else_expr = Some(Box::new(expressions[i + 1].clone())); i += 2; } @@ -371,9 +363,7 @@ pub fn rewrite_expression(expr: &Expr, expressions: &Vec) -> Result expr: Box::new(expressions[0].clone()), data_type: data_type.clone(), }), - Expr::Alias(_, alias) => { - Ok(Expr::Alias(Box::new(expressions[0].clone()), alias.clone())) - } + Expr::Alias(_, alias) => Ok(Expr::Alias(Box::new(expressions[0].clone()), alias.clone())), Expr::Not(_) => Ok(Expr::Not(Box::new(expressions[0].clone()))), Expr::Negative(_) => Ok(Expr::Negative(Box::new(expressions[0].clone()))), Expr::Column(_) => Ok(expr.clone()), @@ -411,6 +401,10 @@ pub fn rewrite_expression(expr: &Expr, expressions: &Vec) -> Result Expr::Wildcard { .. } => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), + Expr::Extract { date_part, expr: _ } => Ok(Expr::Extract { + date_part: *date_part, + expr: Box::new(expressions[0].clone()), + }), } } diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index d2cbd7c4f61..0b4ac9031cb 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -21,6 +21,7 @@ use std::str::FromStr; use std::sync::Arc; use crate::datasource::TableProvider; +use crate::logical_plan::DatePart; use crate::logical_plan::Expr::Alias; use crate::logical_plan::{ and, lit, DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, @@ -41,8 +42,8 @@ use arrow::datatypes::*; use crate::prelude::JoinType; use sqlparser::ast::{ - BinaryOperator, DataType as SQLDataType, Expr as SQLExpr, FunctionArg, Join, - JoinConstraint, JoinOperator, Query, Select, SelectItem, SetExpr, TableFactor, + BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg, + Join, JoinConstraint, JoinOperator, Query, Select, SelectItem, SetExpr, TableFactor, TableWithJoins, UnaryOperator, Value, }; use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption}; @@ -631,6 +632,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { SQLExpr::Value(Value::SingleQuotedString(ref s)) => Ok(lit(s.clone())), SQLExpr::Value(Value::Null) => Ok(Expr::Literal(ScalarValue::Utf8(None))), + SQLExpr::Extract { + field: DateTimeField::Hour, + expr, + } => Ok(Expr::Extract { + date_part: DatePart::Hour, + expr: Box::new(self.sql_expr_to_logical_expr(expr)?), + }), SQLExpr::Identifier(ref id) => { if &id.value[0..1] == "@" { @@ -1132,7 +1140,8 @@ mod tests { #[test] fn test_timestamp_filter() { - let sql = "SELECT state FROM person WHERE birth_date < CAST (158412331400600000 as timestamp)"; + let sql = + "SELECT state FROM person WHERE birth_date < CAST (158412331400600000 as timestamp)"; let expected = "Projection: #state\ \n Filter: #birth_date Lt CAST(Int64(158412331400600000) AS Timestamp(Nanosecond, None))\ @@ -1479,7 +1488,8 @@ mod tests { fn select_simple_aggregate_with_groupby_non_column_expression_nested_and_not_resolvable( ) { // The query should fail, because age + 9 is not in the group by. - let sql = "SELECT ((age + 1) / 2) * (age + 9), MIN(first_name) FROM person GROUP BY age + 1"; + let sql = + "SELECT ((age + 1) / 2) * (age + 9), MIN(first_name) FROM person GROUP BY age + 1"; let err = logical_plan(sql).expect_err("query should have failed"); assert_eq!( "Plan(\"Projection references non-aggregate values\")", diff --git a/rust/datafusion/src/sql/utils.rs b/rust/datafusion/src/sql/utils.rs index 34bd55df49d..fdbd0bfb482 100644 --- a/rust/datafusion/src/sql/utils.rs +++ b/rust/datafusion/src/sql/utils.rs @@ -332,6 +332,10 @@ where Ok(expr.clone()) } Expr::Wildcard => Ok(Expr::Wildcard), + Expr::Extract { date_part, expr: e } => Ok(Expr::Extract { + date_part: *date_part, + expr: Box::new(clone_with_replacement(e, replacement_fn)?), + }), }, } } From b91632d23786755e20211101f0a47a4738222bbf Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 29 Jan 2021 11:46:27 +0100 Subject: [PATCH 02/24] fmt --- rust/datafusion/src/optimizer/utils.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs index 62fd9d2f671..f837ee156c8 100644 --- a/rust/datafusion/src/optimizer/utils.rs +++ b/rust/datafusion/src/optimizer/utils.rs @@ -23,7 +23,8 @@ use arrow::datatypes::Schema; use super::optimizer::OptimizerRule; use crate::logical_plan::{ - Expr, LogicalPlan, Operator, Partitioning, PlanType, Recursion, StringifiedPlan, ToDFSchema, + Expr, LogicalPlan, Operator, Partitioning, PlanType, Recursion, StringifiedPlan, + ToDFSchema, }; use crate::prelude::{col, lit}; use crate::scalar::ScalarValue; @@ -37,7 +38,10 @@ const CASE_ELSE_MARKER: &str = "__DATAFUSION_CASE_ELSE__"; /// Recursively walk a list of expression trees, collecting the unique set of column /// names referenced in the expression -pub fn exprlist_to_column_names(expr: &[Expr], accum: &mut HashSet) -> Result<()> { +pub fn exprlist_to_column_names( + expr: &[Expr], + accum: &mut HashSet, +) -> Result<()> { for e in expr { expr_to_column_names(e, accum)?; } @@ -137,7 +141,9 @@ pub fn expressions(plan: &LogicalPlan) -> Vec { result.extend(aggr_expr.clone()); result } - LogicalPlan::Join { on, .. } => on.iter().flat_map(|(l, r)| vec![col(l), col(r)]).collect(), + LogicalPlan::Join { on, .. } => { + on.iter().flat_map(|(l, r)| vec![col(l), col(r)]).collect() + } LogicalPlan::Sort { expr, .. } => expr.clone(), LogicalPlan::Extension { node } => node.expressions(), // plans without expressions @@ -335,11 +341,15 @@ pub fn rewrite_expression(expr: &Expr, expressions: &Vec) -> Result while i < expressions.len() { match &expressions[i] { - Expr::Literal(ScalarValue::Utf8(Some(str))) if str == CASE_EXPR_MARKER => { + Expr::Literal(ScalarValue::Utf8(Some(str))) + if str == CASE_EXPR_MARKER => + { base_expr = Some(Box::new(expressions[i + 1].clone())); i += 2; } - Expr::Literal(ScalarValue::Utf8(Some(str))) if str == CASE_ELSE_MARKER => { + Expr::Literal(ScalarValue::Utf8(Some(str))) + if str == CASE_ELSE_MARKER => + { else_expr = Some(Box::new(expressions[i + 1].clone())); i += 2; } @@ -363,7 +373,9 @@ pub fn rewrite_expression(expr: &Expr, expressions: &Vec) -> Result expr: Box::new(expressions[0].clone()), data_type: data_type.clone(), }), - Expr::Alias(_, alias) => Ok(Expr::Alias(Box::new(expressions[0].clone()), alias.clone())), + Expr::Alias(_, alias) => { + Ok(Expr::Alias(Box::new(expressions[0].clone()), alias.clone())) + } Expr::Not(_) => Ok(Expr::Not(Box::new(expressions[0].clone()))), Expr::Negative(_) => Ok(Expr::Negative(Box::new(expressions[0].clone()))), Expr::Column(_) => Ok(expr.clone()), From 304008bd737e2f39bac74d2a5c3c15e6b4f468cf Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 29 Jan 2021 12:05:37 +0100 Subject: [PATCH 03/24] Add temporal module --- rust/datafusion/src/logical_plan/temporal.rs | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 rust/datafusion/src/logical_plan/temporal.rs diff --git a/rust/datafusion/src/logical_plan/temporal.rs b/rust/datafusion/src/logical_plan/temporal.rs new file mode 100644 index 00000000000..1c0948ca390 --- /dev/null +++ b/rust/datafusion/src/logical_plan/temporal.rs @@ -0,0 +1,6 @@ +#[derive(Clone, Copy, PartialEq)] +/// Date part +pub enum DatePart { + /// Hour part of date / time + Hour, +} From 3eb552b4c222bb959d8b93a62de09d14703e1f16 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 29 Jan 2021 12:57:45 +0100 Subject: [PATCH 04/24] Test, boilerplate --- rust/datafusion/src/logical_plan/expr.rs | 12 +- rust/datafusion/src/logical_plan/temporal.rs | 2 +- .../src/physical_plan/expressions.rs | 218 ++++++++---------- rust/datafusion/src/physical_plan/planner.rs | 5 + rust/datafusion/tests/sql.rs | 60 ++--- 5 files changed, 146 insertions(+), 151 deletions(-) diff --git a/rust/datafusion/src/logical_plan/expr.rs b/rust/datafusion/src/logical_plan/expr.rs index 9fd90975bf8..5d69d536881 100644 --- a/rust/datafusion/src/logical_plan/expr.rs +++ b/rust/datafusion/src/logical_plan/expr.rs @@ -1120,11 +1120,19 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { let expr = create_name(expr, input_schema)?; let list = list.iter().map(|expr| create_name(expr, input_schema)); if *negated { - Ok(format!("{:?} NOT IN ({:?})", expr, list)) + Ok(format!("{} NOT IN ({:?})", expr, list)) } else { - Ok(format!("{:?} IN ({:?})", expr, list)) + Ok(format!("{} IN ({:?})", expr, list)) } } + Expr::Extract { + date_part: DatePart::Hour, + expr, + } => { + let expr = create_name(expr, input_schema)?; + + Ok(format!("EXTRACT(HOUR FROM {})", expr)) + } other => Err(DataFusionError::NotImplemented(format!( "Physical plan does not support logical expression {:?}", other diff --git a/rust/datafusion/src/logical_plan/temporal.rs b/rust/datafusion/src/logical_plan/temporal.rs index 1c0948ca390..2a77c4e7023 100644 --- a/rust/datafusion/src/logical_plan/temporal.rs +++ b/rust/datafusion/src/logical_plan/temporal.rs @@ -1,4 +1,4 @@ -#[derive(Clone, Copy, PartialEq)] +#[derive(Clone, Copy, Debug, PartialEq)] /// Date part pub enum DatePart { /// Hour part of date / time diff --git a/rust/datafusion/src/physical_plan/expressions.rs b/rust/datafusion/src/physical_plan/expressions.rs index 3ef240dd701..0435b5fc189 100644 --- a/rust/datafusion/src/physical_plan/expressions.rs +++ b/rust/datafusion/src/physical_plan/expressions.rs @@ -24,12 +24,12 @@ use std::sync::Arc; use super::ColumnarValue; use crate::error::{DataFusionError, Result}; +use crate::logical_plan::DatePart; use crate::logical_plan::Operator; use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; use crate::scalar::ScalarValue; use arrow::array::{ - self, Array, BooleanBuilder, GenericStringArray, LargeStringArray, - StringOffsetSizeTrait, + self, Array, BooleanBuilder, GenericStringArray, LargeStringArray, StringOffsetSizeTrait, }; use arrow::compute; use arrow::compute::kernels; @@ -40,8 +40,8 @@ use arrow::compute::kernels::comparison::{ eq_scalar, gt_eq_scalar, gt_scalar, lt_eq_scalar, lt_scalar, neq_scalar, }; use arrow::compute::kernels::comparison::{ - eq_utf8, gt_eq_utf8, gt_utf8, like_utf8, like_utf8_scalar, lt_eq_utf8, lt_utf8, - neq_utf8, nlike_utf8, nlike_utf8_scalar, + eq_utf8, gt_eq_utf8, gt_utf8, like_utf8, like_utf8_scalar, lt_eq_utf8, lt_utf8, neq_utf8, + nlike_utf8, nlike_utf8_scalar, }; use arrow::compute::kernels::comparison::{ eq_utf8_scalar, gt_eq_utf8_scalar, gt_utf8_scalar, lt_eq_utf8_scalar, lt_utf8_scalar, @@ -52,9 +52,9 @@ use arrow::datatypes::{DataType, DateUnit, Schema, TimeUnit}; use arrow::record_batch::RecordBatch; use arrow::{ array::{ - ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, - Int16Array, Int32Array, Int64Array, Int8Array, StringArray, - TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, + ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, Int16Array, + Int32Array, Int64Array, Int8Array, StringArray, TimestampNanosecondArray, UInt16Array, + UInt32Array, UInt64Array, UInt8Array, }, datatypes::Field, }; @@ -135,9 +135,7 @@ pub struct Sum { /// function return type of a sum pub fn sum_return_type(arg_type: &DataType) -> Result { match arg_type { - DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => { - Ok(DataType::Int64) - } + DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => Ok(DataType::Int64), DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => { Ok(DataType::UInt64) } @@ -487,9 +485,7 @@ impl Accumulator for AvgAccumulator { fn evaluate(&self) -> Result { match self.sum { - ScalarValue::Float64(e) => { - Ok(ScalarValue::Float64(e.map(|f| f / self.count as f64))) - } + ScalarValue::Float64(e) => Ok(ScalarValue::Float64(e.map(|f| f / self.count as f64))), _ => Err(DataFusionError::Internal( "Sum should be f64 on average".to_string(), )), @@ -1158,11 +1154,7 @@ pub struct BinaryExpr { impl BinaryExpr { /// Create new binary expression - pub fn new( - left: Arc, - op: Operator, - right: Arc, - ) -> Self { + pub fn new(left: Arc, op: Operator, right: Arc) -> Self { Self { left, op, right } } @@ -1189,10 +1181,7 @@ impl fmt::Display for BinaryExpr { } /// Coercion rules for dictionary values (aka the type of the dictionary itself) -fn dictionary_value_coercion( - lhs_type: &DataType, - rhs_type: &DataType, -) -> Option { +fn dictionary_value_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { numerical_coercion(lhs_type, rhs_type).or_else(|| string_coercion(lhs_type, rhs_type)) } @@ -1326,11 +1315,7 @@ fn order_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option /// Coercion rules for all binary operators. Returns the output type /// of applying `op` to an argument of `lhs_type` and `rhs_type`. -fn common_binary_type( - lhs_type: &DataType, - op: &Operator, - rhs_type: &DataType, -) -> Result { +fn common_binary_type(lhs_type: &DataType, op: &Operator, rhs_type: &DataType) -> Result { // This result MUST be compatible with `binary_coerce` let result = match op { Operator::And | Operator::Or => match (lhs_type, rhs_type) { @@ -1396,9 +1381,7 @@ pub fn binary_operator_data_type( | Operator::GtEq | Operator::LtEq => Ok(DataType::Boolean), // math operations return the same value as the common coerced type - Operator::Plus | Operator::Minus | Operator::Divide | Operator::Multiply => { - Ok(common_type) - } + Operator::Plus | Operator::Minus | Operator::Divide | Operator::Multiply => Ok(common_type), Operator::Modulus => Err(DataFusionError::NotImplemented( "Modulus operator is still not supported".to_string(), )), @@ -1686,15 +1669,12 @@ impl PhysicalExpr for NotExpr { let arg = self.arg.evaluate(batch)?; match arg { ColumnarValue::Array(array) => { - let array = - array - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Internal( - "boolean_op failed to downcast array".to_owned(), - ) - })?; + let array = array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal("boolean_op failed to downcast array".to_owned()) + })?; Ok(ColumnarValue::Array(Arc::new( arrow::compute::kernels::boolean::not(array)?, ))) @@ -1715,10 +1695,7 @@ impl PhysicalExpr for NotExpr { /// # Errors /// /// This function errors when the argument's type is not boolean -pub fn not( - arg: Arc, - input_schema: &Schema, -) -> Result> { +pub fn not(arg: Arc, input_schema: &Schema) -> Result> { let data_type = arg.data_type(input_schema)?; if data_type != DataType::Boolean { Err(DataFusionError::Internal(format!( @@ -1788,9 +1765,7 @@ impl PhysicalExpr for NegativeExpr { }; result.map(|a| ColumnarValue::Array(a)) } - ColumnarValue::Scalar(scalar) => { - Ok(ColumnarValue::Scalar(scalar.arithmetic_negate())) - } + ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(scalar.arithmetic_negate())), } } } @@ -1861,9 +1836,9 @@ impl PhysicalExpr for IsNullExpr { ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new( arrow::compute::is_null(array.as_ref())?, ))), - ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( - ScalarValue::Boolean(Some(scalar.is_null())), - )), + ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some( + scalar.is_null(), + )))), } } } @@ -1918,9 +1893,9 @@ impl PhysicalExpr for IsNotNullExpr { ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new( arrow::compute::is_not_null(array.as_ref())?, ))), - ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( - ScalarValue::Boolean(Some(!scalar.is_null())), - )), + ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some( + !scalar.is_null(), + )))), } } } @@ -2402,9 +2377,7 @@ pub fn is_signed_numeric(dt: &DataType) -> bool { pub fn is_numeric(dt: &DataType) -> bool { is_signed_numeric(dt) || match dt { - DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => { - true - } + DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => true, _ => false, } } @@ -2553,6 +2526,44 @@ pub struct InListExpr { negated: bool, } +impl fmt::Display for Extract { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + //write!(f, "{}", self.value) + Ok(()) + } +} + +/// InList +#[derive(Debug)] +pub struct Extract { + date_part: DatePart, + expr: Arc, +} + +impl Extract { + pub fn new(date_part: DatePart, expr: Arc) -> Self { + Self { date_part, expr } + } +} + +impl PhysicalExpr for Extract { + fn as_any(&self) -> &dyn Any { + todo!() + } + + fn data_type(&self, input_schema: &Schema) -> Result { + todo!() + } + + fn nullable(&self, input_schema: &Schema) -> Result { + todo!() + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + todo!() + } +} + macro_rules! make_contains { ($ARRAY:expr, $LIST_VALUES:expr, $NEGATED:expr, $SCALAR_VALUE:ident, $ARRAY_TYPE:ident) => {{ let array = $ARRAY.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap(); @@ -2776,9 +2787,7 @@ impl PhysicalExpr for InListExpr { make_contains!(array, list_values, self.negated, Boolean, BooleanArray) } DataType::Utf8 => self.compare_utf8::(array, list_values, self.negated), - DataType::LargeUtf8 => { - self.compare_utf8::(array, list_values, self.negated) - } + DataType::LargeUtf8 => self.compare_utf8::(array, list_values, self.negated), datatype => { unimplemented!("InList does not support datatype {:?}.", datatype) } @@ -2795,6 +2804,11 @@ pub fn in_list( Ok(Arc::new(InListExpr::new(expr, list, *negated))) } +/// Creates a expression Extract +pub fn Extract(date_part: DatePart, expr: Arc) -> Result> { + Ok(Arc::new(Extract::new(date_part, expr))) +} + #[cfg(test)] mod tests { use super::*; @@ -2826,8 +2840,7 @@ mod tests { ]); let a = Int32Array::from(vec![1, 2, 3, 4, 5]); let b = Int32Array::from(vec![1, 2, 4, 8, 16]); - let batch = - RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?; + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?; // expression: "a < b" let lt = binary_simple(col("a"), Operator::Lt, col("b")); @@ -2854,8 +2867,7 @@ mod tests { ]); let a = Int32Array::from(vec![2, 4, 6, 8, 10]); let b = Int32Array::from(vec![2, 5, 4, 8, 8]); - let batch = - RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?; + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?; // expression: "a < b OR a == b" let expr = binary_simple( @@ -2918,10 +2930,8 @@ mod tests { ]); let a = $A_ARRAY::from($A_VEC); let b = $B_ARRAY::from($B_VEC); - let batch = RecordBatch::try_new( - Arc::new(schema.clone()), - vec![Arc::new(a), Arc::new(b)], - )?; + let batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a), Arc::new(b)])?; // verify that we can construct the expression let expression = binary(col("a"), $OP, col("b"), &schema)?; @@ -3092,8 +3102,7 @@ mod tests { #[test] fn test_dictionary_type_to_array_coersion() -> Result<()> { // Test string a string dictionary - let dict_type = - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); let string_type = DataType::Utf8; // build dictionary @@ -3107,8 +3116,7 @@ mod tests { dict_builder.append("four")?; let dict_array = dict_builder.finish(); - let str_array = - StringArray::from(vec![Some("not one"), Some("two"), None, Some("four")]); + let str_array = StringArray::from(vec![Some("not one"), Some("two"), None, Some("four")]); let schema = Arc::new(Schema::new(vec![ Field::new("dict", dict_type, true), @@ -3163,8 +3171,7 @@ mod tests { #[test] fn test_coersion_error() -> Result<()> { - let expr = - common_binary_type(&DataType::Float32, &Operator::Plus, &DataType::Utf8); + let expr = common_binary_type(&DataType::Float32, &Operator::Plus, &DataType::Utf8); if let Err(DataFusionError::Plan(e)) = expr { assert_eq!(e, "'Float32 + Utf8' can't be evaluated because there isn't a common type to coerce the types to"); @@ -3186,8 +3193,7 @@ mod tests { ($A_ARRAY:ident, $A_TYPE:expr, $A_VEC:expr, $TYPEARRAY:ident, $TYPE:expr, $VEC:expr) => {{ let schema = Schema::new(vec![Field::new("a", $A_TYPE, false)]); let a = $A_ARRAY::from($A_VEC); - let batch = - RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?; + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?; // verify that we can construct the expression let expression = cast(col("a"), &schema, $TYPE)?; @@ -3280,8 +3286,7 @@ mod tests { let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![$ARRAY])?; - let agg = - Arc::new(<$OP>::new(col("a"), "bla".to_string(), $EXPECTED_DATATYPE)); + let agg = Arc::new(<$OP>::new(col("a"), "bla".to_string(), $EXPECTED_DATATYPE)); let actual = aggregate(&batch, agg)?; let expected = ScalarValue::from($EXPECTED); @@ -3509,8 +3514,7 @@ mod tests { #[test] fn sum_u32() -> Result<()> { - let a: ArrayRef = - Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); + let a: ArrayRef = Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); generic_test_op!( a, DataType::UInt32, @@ -3522,8 +3526,7 @@ mod tests { #[test] fn avg_u32() -> Result<()> { - let a: ArrayRef = - Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); + let a: ArrayRef = Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); generic_test_op!( a, DataType::UInt32, @@ -3535,8 +3538,7 @@ mod tests { #[test] fn max_u32() -> Result<()> { - let a: ArrayRef = - Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); + let a: ArrayRef = Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); generic_test_op!( a, DataType::UInt32, @@ -3548,8 +3550,7 @@ mod tests { #[test] fn min_u32() -> Result<()> { - let a: ArrayRef = - Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); + let a: ArrayRef = Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); generic_test_op!( a, DataType::UInt32, @@ -3561,8 +3562,7 @@ mod tests { #[test] fn sum_f32() -> Result<()> { - let a: ArrayRef = - Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); + let a: ArrayRef = Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); generic_test_op!( a, DataType::Float32, @@ -3574,8 +3574,7 @@ mod tests { #[test] fn avg_f32() -> Result<()> { - let a: ArrayRef = - Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); + let a: ArrayRef = Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); generic_test_op!( a, DataType::Float32, @@ -3587,8 +3586,7 @@ mod tests { #[test] fn max_f32() -> Result<()> { - let a: ArrayRef = - Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); + let a: ArrayRef = Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); generic_test_op!( a, DataType::Float32, @@ -3600,8 +3598,7 @@ mod tests { #[test] fn min_f32() -> Result<()> { - let a: ArrayRef = - Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); + let a: ArrayRef = Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); generic_test_op!( a, DataType::Float32, @@ -3613,8 +3610,7 @@ mod tests { #[test] fn sum_f64() -> Result<()> { - let a: ArrayRef = - Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); + let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); generic_test_op!( a, DataType::Float64, @@ -3626,8 +3622,7 @@ mod tests { #[test] fn avg_f64() -> Result<()> { - let a: ArrayRef = - Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); + let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); generic_test_op!( a, DataType::Float64, @@ -3639,8 +3634,7 @@ mod tests { #[test] fn max_f64() -> Result<()> { - let a: ArrayRef = - Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); + let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); generic_test_op!( a, DataType::Float64, @@ -3652,8 +3646,7 @@ mod tests { #[test] fn min_f64() -> Result<()> { - let a: ArrayRef = - Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); + let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); generic_test_op!( a, DataType::Float64, @@ -3723,8 +3716,7 @@ mod tests { #[test] fn count_utf8() -> Result<()> { - let a: ArrayRef = - Arc::new(StringArray::from(vec!["a", "bb", "ccc", "dddd", "ad"])); + let a: ArrayRef = Arc::new(StringArray::from(vec!["a", "bb", "ccc", "dddd", "ad"])); generic_test_op!( a, DataType::Utf8, @@ -3736,8 +3728,7 @@ mod tests { #[test] fn count_large_utf8() -> Result<()> { - let a: ArrayRef = - Arc::new(LargeStringArray::from(vec!["a", "bb", "ccc", "dddd", "ad"])); + let a: ArrayRef = Arc::new(LargeStringArray::from(vec!["a", "bb", "ccc", "dddd", "ad"])); generic_test_op!( a, DataType::LargeUtf8, @@ -3811,10 +3802,7 @@ mod tests { Ok(()) } - fn aggregate( - batch: &RecordBatch, - agg: Arc, - ) -> Result { + fn aggregate(batch: &RecordBatch, agg: Arc) -> Result { let mut accum = agg.create_accumulator()?; let expr = agg.expressions(); let values = expr @@ -3925,10 +3913,7 @@ mod tests { Ok(()) } - fn assert_array_eq( - expected: PrimitiveArray, - actual: ArrayRef, - ) { + fn assert_array_eq(expected: PrimitiveArray, actual: ArrayRef) { let actual = actual .as_any() .downcast_ref::>() @@ -3954,8 +3939,7 @@ mod tests { let input = BooleanArray::from(vec![Some(true), None, Some(false)]); let expected = &BooleanArray::from(vec![Some(false), None, Some(true)]); - let batch = - RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(input)])?; + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(input)])?; let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); let result = result @@ -4066,8 +4050,7 @@ mod tests { .downcast_ref::() .expect("failed to downcast to Int32Array"); - let expected = - &Int32Array::from(vec![Some(123), Some(999), Some(999), Some(456)]); + let expected = &Int32Array::from(vec![Some(123), Some(999), Some(999), Some(456)]); assert_eq!(expected, result); @@ -4136,8 +4119,7 @@ mod tests { .downcast_ref::() .expect("failed to downcast to Int32Array"); - let expected = - &Int32Array::from(vec![Some(123), Some(999), Some(999), Some(456)]); + let expected = &Int32Array::from(vec![Some(123), Some(999), Some(999), Some(456)]); assert_eq!(expected, result); diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index 6ba0d2606a2..e6bc98e59fa 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -647,6 +647,11 @@ impl DefaultPhysicalPlanner { expressions::in_list(value_expr, list_exprs, negated) } }, + Expr::Extract{date_part, expr} => { + let value_expr = self.create_physical_expr(expr, input_schema, ctx_state)?; + let value_expr_data_type = value_expr.data_type(input_schema)?; + + } other => Err(DataFusionError::NotImplemented(format!( "Physical plan does not support logical expression {:?}", other diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 14b76a59299..7b4425efd8f 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -350,8 +350,7 @@ async fn csv_query_group_by_float32() -> Result<()> { let mut ctx = ExecutionContext::new(); register_aggregate_simple_csv(&mut ctx)?; - let sql = - "SELECT COUNT(*) as cnt, c1 FROM aggregate_simple GROUP BY c1 ORDER BY cnt DESC"; + let sql = "SELECT COUNT(*) as cnt, c1 FROM aggregate_simple GROUP BY c1 ORDER BY cnt DESC"; let actual = execute(&mut ctx, sql).await; let expected = vec![ @@ -371,8 +370,7 @@ async fn csv_query_group_by_float64() -> Result<()> { let mut ctx = ExecutionContext::new(); register_aggregate_simple_csv(&mut ctx)?; - let sql = - "SELECT COUNT(*) as cnt, c2 FROM aggregate_simple GROUP BY c2 ORDER BY cnt DESC"; + let sql = "SELECT COUNT(*) as cnt, c2 FROM aggregate_simple GROUP BY c2 ORDER BY cnt DESC"; let actual = execute(&mut ctx, sql).await; let expected = vec![ @@ -392,8 +390,7 @@ async fn csv_query_group_by_boolean() -> Result<()> { let mut ctx = ExecutionContext::new(); register_aggregate_simple_csv(&mut ctx)?; - let sql = - "SELECT COUNT(*) as cnt, c3 FROM aggregate_simple GROUP BY c3 ORDER BY cnt DESC"; + let sql = "SELECT COUNT(*) as cnt, c3 FROM aggregate_simple GROUP BY c3 ORDER BY cnt DESC"; let actual = execute(&mut ctx, sql).await; let expected = vec![vec!["9", "true"], vec!["6", "false"]]; @@ -710,7 +707,8 @@ async fn csv_query_cast() -> Result<()> { async fn csv_query_cast_literal() -> Result<()> { let mut ctx = ExecutionContext::new(); register_aggregate_csv(&mut ctx)?; - let sql = "SELECT c12, CAST(1 AS float) FROM aggregate_test_100 WHERE c12 > CAST(0 AS float) LIMIT 2"; + let sql = + "SELECT c12, CAST(1 AS float) FROM aggregate_test_100 WHERE c12 > CAST(0 AS float) LIMIT 2"; let actual = execute(&mut ctx, sql).await; let expected = vec![ vec!["0.9294097332465232", "1"], @@ -1005,8 +1003,7 @@ async fn csv_query_external_table_sum() { let mut ctx = ExecutionContext::new(); // cast smallint and int to bigint to avoid overflow during calculation register_aggregate_csv_by_sql(&mut ctx).await; - let sql = - "SELECT SUM(CAST(c7 AS BIGINT)), SUM(CAST(c8 AS BIGINT)) FROM aggregate_test_100"; + let sql = "SELECT SUM(CAST(c7 AS BIGINT)), SUM(CAST(c8 AS BIGINT)) FROM aggregate_test_100"; let actual = execute(&mut ctx, sql).await; let expected = vec![vec!["13060", "3017641"]]; assert_eq!(expected, actual); @@ -1108,8 +1105,7 @@ fn create_case_context() -> Result { #[tokio::test] async fn equijoin() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?; - let sql = - "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id ORDER BY t1_id"; + let sql = "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id ORDER BY t1_id"; let actual = execute(&mut ctx, sql).await; let expected = vec![ vec!["11", "a", "z"], @@ -1123,8 +1119,7 @@ async fn equijoin() -> Result<()> { #[tokio::test] async fn left_join() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?; - let sql = - "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id ORDER BY t1_id"; + let sql = "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id ORDER BY t1_id"; let actual = execute(&mut ctx, sql).await; let expected = vec![ vec!["11", "a", "z"], @@ -1170,8 +1165,7 @@ async fn left_join_using() -> Result<()> { #[tokio::test] async fn equijoin_implicit_syntax() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?; - let sql = - "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t1_id = t2_id ORDER BY t1_id"; + let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t1_id = t2_id ORDER BY t1_id"; let actual = execute(&mut ctx, sql).await; let expected = vec![ vec!["11", "a", "z"], @@ -1204,8 +1198,7 @@ async fn equijoin_implicit_syntax_with_filter() -> Result<()> { #[tokio::test] async fn equijoin_implicit_syntax_reversed() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?; - let sql = - "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t2_id = t1_id ORDER BY t1_id"; + let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t2_id = t1_id ORDER BY t1_id"; let actual = execute(&mut ctx, sql).await; let expected = vec![ vec!["11", "a", "z"], @@ -1228,10 +1221,7 @@ async fn cartesian_join() -> Result<()> { Ok(()) } -fn create_join_context( - column_left: &str, - column_right: &str, -) -> Result { +fn create_join_context(column_left: &str, column_right: &str) -> Result { let mut ctx = ExecutionContext::new(); let t1_schema = Arc::new(Schema::new(vec![ @@ -1763,8 +1753,7 @@ async fn query_count_distinct() -> Result<()> { async fn query_on_string_dictionary() -> Result<()> { // Test to ensure DataFusion can operate on dictionary types // Use StringDictionary (32 bit indexes = keys) - let field_type = - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + let field_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); let schema = Arc::new(Schema::new(vec![Field::new("d1", field_type, true)])); let keys_builder = PrimitiveBuilder::::new(10); @@ -1920,8 +1909,7 @@ async fn string_expressions() -> Result<()> { let actual = execute(&mut ctx, sql).await; let expected = vec![vec![ - "3", "NULL", "3", "NULL", "tom", "NULL", "TOM", "NULL", "tom", "NULL", "tom ", - " tom", + "3", "NULL", "3", "NULL", "tom", "NULL", "TOM", "NULL", "tom", "NULL", "tom ", " tom", ]]; assert_eq!(expected, actual); Ok(()) @@ -1964,6 +1952,19 @@ async fn crypto_expressions() -> Result<()> { Ok(()) } +#[tokio::test] +async fn extract_date_part() -> Result<()> { + let mut ctx = ExecutionContext::new(); + let sql = "SELECT + EXTRACT(HOUR FROM CAST('2020-01-01' AS date)) + "; + let actual = execute(&mut ctx, sql).await; + + let expected = vec![vec!["0"]]; + assert_eq!(expected, actual); + Ok(()) +} + #[tokio::test] async fn in_list_array() -> Result<()> { let mut ctx = ExecutionContext::new(); @@ -2040,11 +2041,10 @@ async fn in_list_scalar() -> Result<()> { let actual = execute(&mut ctx, sql).await; let expected = vec![vec![ - "true", "false", "true", "false", "NULL", "NULL", "true", "NULL", "false", - "NULL", "true", "false", "true", "false", "NULL", "NULL", "true", "NULL", - "false", "NULL", "true", "false", "true", "false", "NULL", "NULL", "true", - "NULL", "false", "NULL", "true", "false", "true", "false", "NULL", "NULL", - "true", "NULL", "false", "NULL", + "true", "false", "true", "false", "NULL", "NULL", "true", "NULL", "false", "NULL", "true", + "false", "true", "false", "NULL", "NULL", "true", "NULL", "false", "NULL", "true", "false", + "true", "false", "NULL", "NULL", "true", "NULL", "false", "NULL", "true", "false", "true", + "false", "NULL", "NULL", "true", "NULL", "false", "NULL", ]]; assert_eq!(expected, actual); Ok(()) From 8284f1c05c763cc906e0f8cf5c1f8135896cb61b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 29 Jan 2021 12:58:43 +0100 Subject: [PATCH 05/24] Add todo!() --- rust/datafusion/src/physical_plan/planner.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index e6bc98e59fa..bc6b26069ad 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -651,6 +651,7 @@ impl DefaultPhysicalPlanner { let value_expr = self.create_physical_expr(expr, input_schema, ctx_state)?; let value_expr_data_type = value_expr.data_type(input_schema)?; + todo!("") } other => Err(DataFusionError::NotImplemented(format!( "Physical plan does not support logical expression {:?}", From dbbb6477559c13dbcdebfeab5cbee3b6b995e928 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 29 Jan 2021 15:24:30 +0100 Subject: [PATCH 06/24] Extract implementation --- rust/datafusion/src/logical_plan/temporal.rs | 12 + .../src/physical_plan/expressions.rs | 206 ++++++++++++------ rust/datafusion/src/physical_plan/planner.rs | 9 +- rust/datafusion/tests/sql.rs | 41 ++-- 4 files changed, 184 insertions(+), 84 deletions(-) diff --git a/rust/datafusion/src/logical_plan/temporal.rs b/rust/datafusion/src/logical_plan/temporal.rs index 2a77c4e7023..cf533c38121 100644 --- a/rust/datafusion/src/logical_plan/temporal.rs +++ b/rust/datafusion/src/logical_plan/temporal.rs @@ -1,6 +1,18 @@ +use core::fmt; + #[derive(Clone, Copy, Debug, PartialEq)] /// Date part pub enum DatePart { /// Hour part of date / time Hour, } + +impl fmt::Display for DatePart { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + DatePart::Hour => { + write!(f, "HOUR") + } + } + } +} diff --git a/rust/datafusion/src/physical_plan/expressions.rs b/rust/datafusion/src/physical_plan/expressions.rs index 0435b5fc189..3cb07bb8f47 100644 --- a/rust/datafusion/src/physical_plan/expressions.rs +++ b/rust/datafusion/src/physical_plan/expressions.rs @@ -28,9 +28,6 @@ use crate::logical_plan::DatePart; use crate::logical_plan::Operator; use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; use crate::scalar::ScalarValue; -use arrow::array::{ - self, Array, BooleanBuilder, GenericStringArray, LargeStringArray, StringOffsetSizeTrait, -}; use arrow::compute; use arrow::compute::kernels; use arrow::compute::kernels::arithmetic::{add, divide, multiply, negate, subtract}; @@ -40,8 +37,8 @@ use arrow::compute::kernels::comparison::{ eq_scalar, gt_eq_scalar, gt_scalar, lt_eq_scalar, lt_scalar, neq_scalar, }; use arrow::compute::kernels::comparison::{ - eq_utf8, gt_eq_utf8, gt_utf8, like_utf8, like_utf8_scalar, lt_eq_utf8, lt_utf8, neq_utf8, - nlike_utf8, nlike_utf8_scalar, + eq_utf8, gt_eq_utf8, gt_utf8, like_utf8, like_utf8_scalar, lt_eq_utf8, lt_utf8, + neq_utf8, nlike_utf8, nlike_utf8_scalar, }; use arrow::compute::kernels::comparison::{ eq_utf8_scalar, gt_eq_utf8_scalar, gt_utf8_scalar, lt_eq_utf8_scalar, lt_utf8_scalar, @@ -52,9 +49,16 @@ use arrow::datatypes::{DataType, DateUnit, Schema, TimeUnit}; use arrow::record_batch::RecordBatch; use arrow::{ array::{ - ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, Int16Array, - Int32Array, Int64Array, Int8Array, StringArray, TimestampNanosecondArray, UInt16Array, - UInt32Array, UInt64Array, UInt8Array, + self, Array, BooleanBuilder, GenericStringArray, LargeStringArray, + StringOffsetSizeTrait, + }, + compute::hour, +}; +use arrow::{ + array::{ + ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, + Int16Array, Int32Array, Int64Array, Int8Array, StringArray, + TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }, datatypes::Field, }; @@ -135,7 +139,9 @@ pub struct Sum { /// function return type of a sum pub fn sum_return_type(arg_type: &DataType) -> Result { match arg_type { - DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => Ok(DataType::Int64), + DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => { + Ok(DataType::Int64) + } DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => { Ok(DataType::UInt64) } @@ -485,7 +491,9 @@ impl Accumulator for AvgAccumulator { fn evaluate(&self) -> Result { match self.sum { - ScalarValue::Float64(e) => Ok(ScalarValue::Float64(e.map(|f| f / self.count as f64))), + ScalarValue::Float64(e) => { + Ok(ScalarValue::Float64(e.map(|f| f / self.count as f64))) + } _ => Err(DataFusionError::Internal( "Sum should be f64 on average".to_string(), )), @@ -1154,7 +1162,11 @@ pub struct BinaryExpr { impl BinaryExpr { /// Create new binary expression - pub fn new(left: Arc, op: Operator, right: Arc) -> Self { + pub fn new( + left: Arc, + op: Operator, + right: Arc, + ) -> Self { Self { left, op, right } } @@ -1181,7 +1193,10 @@ impl fmt::Display for BinaryExpr { } /// Coercion rules for dictionary values (aka the type of the dictionary itself) -fn dictionary_value_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { +fn dictionary_value_coercion( + lhs_type: &DataType, + rhs_type: &DataType, +) -> Option { numerical_coercion(lhs_type, rhs_type).or_else(|| string_coercion(lhs_type, rhs_type)) } @@ -1315,7 +1330,11 @@ fn order_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option /// Coercion rules for all binary operators. Returns the output type /// of applying `op` to an argument of `lhs_type` and `rhs_type`. -fn common_binary_type(lhs_type: &DataType, op: &Operator, rhs_type: &DataType) -> Result { +fn common_binary_type( + lhs_type: &DataType, + op: &Operator, + rhs_type: &DataType, +) -> Result { // This result MUST be compatible with `binary_coerce` let result = match op { Operator::And | Operator::Or => match (lhs_type, rhs_type) { @@ -1381,7 +1400,9 @@ pub fn binary_operator_data_type( | Operator::GtEq | Operator::LtEq => Ok(DataType::Boolean), // math operations return the same value as the common coerced type - Operator::Plus | Operator::Minus | Operator::Divide | Operator::Multiply => Ok(common_type), + Operator::Plus | Operator::Minus | Operator::Divide | Operator::Multiply => { + Ok(common_type) + } Operator::Modulus => Err(DataFusionError::NotImplemented( "Modulus operator is still not supported".to_string(), )), @@ -1669,12 +1690,15 @@ impl PhysicalExpr for NotExpr { let arg = self.arg.evaluate(batch)?; match arg { ColumnarValue::Array(array) => { - let array = array - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Internal("boolean_op failed to downcast array".to_owned()) - })?; + let array = + array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal( + "boolean_op failed to downcast array".to_owned(), + ) + })?; Ok(ColumnarValue::Array(Arc::new( arrow::compute::kernels::boolean::not(array)?, ))) @@ -1695,7 +1719,10 @@ impl PhysicalExpr for NotExpr { /// # Errors /// /// This function errors when the argument's type is not boolean -pub fn not(arg: Arc, input_schema: &Schema) -> Result> { +pub fn not( + arg: Arc, + input_schema: &Schema, +) -> Result> { let data_type = arg.data_type(input_schema)?; if data_type != DataType::Boolean { Err(DataFusionError::Internal(format!( @@ -1765,7 +1792,9 @@ impl PhysicalExpr for NegativeExpr { }; result.map(|a| ColumnarValue::Array(a)) } - ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(scalar.arithmetic_negate())), + ColumnarValue::Scalar(scalar) => { + Ok(ColumnarValue::Scalar(scalar.arithmetic_negate())) + } } } } @@ -1836,9 +1865,9 @@ impl PhysicalExpr for IsNullExpr { ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new( arrow::compute::is_null(array.as_ref())?, ))), - ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some( - scalar.is_null(), - )))), + ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( + ScalarValue::Boolean(Some(scalar.is_null())), + )), } } } @@ -1893,9 +1922,9 @@ impl PhysicalExpr for IsNotNullExpr { ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new( arrow::compute::is_not_null(array.as_ref())?, ))), - ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some( - !scalar.is_null(), - )))), + ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( + ScalarValue::Boolean(Some(!scalar.is_null())), + )), } } } @@ -2377,7 +2406,9 @@ pub fn is_signed_numeric(dt: &DataType) -> bool { pub fn is_numeric(dt: &DataType) -> bool { is_signed_numeric(dt) || match dt { - DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => true, + DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => { + true + } _ => false, } } @@ -2528,8 +2559,7 @@ pub struct InListExpr { impl fmt::Display for Extract { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - //write!(f, "{}", self.value) - Ok(()) + write!(f, "EXTRACT({} AS {:?})", self.date_part, self.expr) } } @@ -2541,6 +2571,7 @@ pub struct Extract { } impl Extract { + /// Create new Extract expression pub fn new(date_part: DatePart, expr: Arc) -> Self { Self { date_part, expr } } @@ -2551,16 +2582,24 @@ impl PhysicalExpr for Extract { todo!() } - fn data_type(&self, input_schema: &Schema) -> Result { - todo!() + fn data_type(&self, _input_schema: &Schema) -> Result { + Ok(DataType::Int32) } fn nullable(&self, input_schema: &Schema) -> Result { - todo!() + self.expr.nullable(input_schema) } fn evaluate(&self, batch: &RecordBatch) -> Result { - todo!() + let value = self.expr.evaluate(batch)?; + let array = match value { + ColumnarValue::Array(array) => array, + ColumnarValue::Scalar(scalar) => scalar.to_array(), + }; + + let array = array.as_any().downcast_ref::().unwrap(); + + Ok(ColumnarValue::Array(Arc::new(hour(array)?))) } } @@ -2787,7 +2826,9 @@ impl PhysicalExpr for InListExpr { make_contains!(array, list_values, self.negated, Boolean, BooleanArray) } DataType::Utf8 => self.compare_utf8::(array, list_values, self.negated), - DataType::LargeUtf8 => self.compare_utf8::(array, list_values, self.negated), + DataType::LargeUtf8 => { + self.compare_utf8::(array, list_values, self.negated) + } datatype => { unimplemented!("InList does not support datatype {:?}.", datatype) } @@ -2805,7 +2846,10 @@ pub fn in_list( } /// Creates a expression Extract -pub fn Extract(date_part: DatePart, expr: Arc) -> Result> { +pub fn extract( + date_part: DatePart, + expr: Arc, +) -> Result> { Ok(Arc::new(Extract::new(date_part, expr))) } @@ -2840,7 +2884,8 @@ mod tests { ]); let a = Int32Array::from(vec![1, 2, 3, 4, 5]); let b = Int32Array::from(vec![1, 2, 4, 8, 16]); - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?; + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?; // expression: "a < b" let lt = binary_simple(col("a"), Operator::Lt, col("b")); @@ -2867,7 +2912,8 @@ mod tests { ]); let a = Int32Array::from(vec![2, 4, 6, 8, 10]); let b = Int32Array::from(vec![2, 5, 4, 8, 8]); - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?; + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?; // expression: "a < b OR a == b" let expr = binary_simple( @@ -2930,8 +2976,10 @@ mod tests { ]); let a = $A_ARRAY::from($A_VEC); let b = $B_ARRAY::from($B_VEC); - let batch = - RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a), Arc::new(b)])?; + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(a), Arc::new(b)], + )?; // verify that we can construct the expression let expression = binary(col("a"), $OP, col("b"), &schema)?; @@ -3102,7 +3150,8 @@ mod tests { #[test] fn test_dictionary_type_to_array_coersion() -> Result<()> { // Test string a string dictionary - let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + let dict_type = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); let string_type = DataType::Utf8; // build dictionary @@ -3116,7 +3165,8 @@ mod tests { dict_builder.append("four")?; let dict_array = dict_builder.finish(); - let str_array = StringArray::from(vec![Some("not one"), Some("two"), None, Some("four")]); + let str_array = + StringArray::from(vec![Some("not one"), Some("two"), None, Some("four")]); let schema = Arc::new(Schema::new(vec![ Field::new("dict", dict_type, true), @@ -3171,7 +3221,8 @@ mod tests { #[test] fn test_coersion_error() -> Result<()> { - let expr = common_binary_type(&DataType::Float32, &Operator::Plus, &DataType::Utf8); + let expr = + common_binary_type(&DataType::Float32, &Operator::Plus, &DataType::Utf8); if let Err(DataFusionError::Plan(e)) = expr { assert_eq!(e, "'Float32 + Utf8' can't be evaluated because there isn't a common type to coerce the types to"); @@ -3193,7 +3244,8 @@ mod tests { ($A_ARRAY:ident, $A_TYPE:expr, $A_VEC:expr, $TYPEARRAY:ident, $TYPE:expr, $VEC:expr) => {{ let schema = Schema::new(vec![Field::new("a", $A_TYPE, false)]); let a = $A_ARRAY::from($A_VEC); - let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?; + let batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?; // verify that we can construct the expression let expression = cast(col("a"), &schema, $TYPE)?; @@ -3286,7 +3338,8 @@ mod tests { let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![$ARRAY])?; - let agg = Arc::new(<$OP>::new(col("a"), "bla".to_string(), $EXPECTED_DATATYPE)); + let agg = + Arc::new(<$OP>::new(col("a"), "bla".to_string(), $EXPECTED_DATATYPE)); let actual = aggregate(&batch, agg)?; let expected = ScalarValue::from($EXPECTED); @@ -3514,7 +3567,8 @@ mod tests { #[test] fn sum_u32() -> Result<()> { - let a: ArrayRef = Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); + let a: ArrayRef = + Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); generic_test_op!( a, DataType::UInt32, @@ -3526,7 +3580,8 @@ mod tests { #[test] fn avg_u32() -> Result<()> { - let a: ArrayRef = Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); + let a: ArrayRef = + Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); generic_test_op!( a, DataType::UInt32, @@ -3538,7 +3593,8 @@ mod tests { #[test] fn max_u32() -> Result<()> { - let a: ArrayRef = Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); + let a: ArrayRef = + Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); generic_test_op!( a, DataType::UInt32, @@ -3550,7 +3606,8 @@ mod tests { #[test] fn min_u32() -> Result<()> { - let a: ArrayRef = Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); + let a: ArrayRef = + Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); generic_test_op!( a, DataType::UInt32, @@ -3562,7 +3619,8 @@ mod tests { #[test] fn sum_f32() -> Result<()> { - let a: ArrayRef = Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); + let a: ArrayRef = + Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); generic_test_op!( a, DataType::Float32, @@ -3574,7 +3632,8 @@ mod tests { #[test] fn avg_f32() -> Result<()> { - let a: ArrayRef = Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); + let a: ArrayRef = + Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); generic_test_op!( a, DataType::Float32, @@ -3586,7 +3645,8 @@ mod tests { #[test] fn max_f32() -> Result<()> { - let a: ArrayRef = Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); + let a: ArrayRef = + Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); generic_test_op!( a, DataType::Float32, @@ -3598,7 +3658,8 @@ mod tests { #[test] fn min_f32() -> Result<()> { - let a: ArrayRef = Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); + let a: ArrayRef = + Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); generic_test_op!( a, DataType::Float32, @@ -3610,7 +3671,8 @@ mod tests { #[test] fn sum_f64() -> Result<()> { - let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); + let a: ArrayRef = + Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); generic_test_op!( a, DataType::Float64, @@ -3622,7 +3684,8 @@ mod tests { #[test] fn avg_f64() -> Result<()> { - let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); + let a: ArrayRef = + Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); generic_test_op!( a, DataType::Float64, @@ -3634,7 +3697,8 @@ mod tests { #[test] fn max_f64() -> Result<()> { - let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); + let a: ArrayRef = + Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); generic_test_op!( a, DataType::Float64, @@ -3646,7 +3710,8 @@ mod tests { #[test] fn min_f64() -> Result<()> { - let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); + let a: ArrayRef = + Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); generic_test_op!( a, DataType::Float64, @@ -3716,7 +3781,8 @@ mod tests { #[test] fn count_utf8() -> Result<()> { - let a: ArrayRef = Arc::new(StringArray::from(vec!["a", "bb", "ccc", "dddd", "ad"])); + let a: ArrayRef = + Arc::new(StringArray::from(vec!["a", "bb", "ccc", "dddd", "ad"])); generic_test_op!( a, DataType::Utf8, @@ -3728,7 +3794,8 @@ mod tests { #[test] fn count_large_utf8() -> Result<()> { - let a: ArrayRef = Arc::new(LargeStringArray::from(vec!["a", "bb", "ccc", "dddd", "ad"])); + let a: ArrayRef = + Arc::new(LargeStringArray::from(vec!["a", "bb", "ccc", "dddd", "ad"])); generic_test_op!( a, DataType::LargeUtf8, @@ -3802,7 +3869,10 @@ mod tests { Ok(()) } - fn aggregate(batch: &RecordBatch, agg: Arc) -> Result { + fn aggregate( + batch: &RecordBatch, + agg: Arc, + ) -> Result { let mut accum = agg.create_accumulator()?; let expr = agg.expressions(); let values = expr @@ -3913,7 +3983,10 @@ mod tests { Ok(()) } - fn assert_array_eq(expected: PrimitiveArray, actual: ArrayRef) { + fn assert_array_eq( + expected: PrimitiveArray, + actual: ArrayRef, + ) { let actual = actual .as_any() .downcast_ref::>() @@ -3939,7 +4012,8 @@ mod tests { let input = BooleanArray::from(vec![Some(true), None, Some(false)]); let expected = &BooleanArray::from(vec![Some(false), None, Some(true)]); - let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(input)])?; + let batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(input)])?; let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); let result = result @@ -4050,7 +4124,8 @@ mod tests { .downcast_ref::() .expect("failed to downcast to Int32Array"); - let expected = &Int32Array::from(vec![Some(123), Some(999), Some(999), Some(456)]); + let expected = + &Int32Array::from(vec![Some(123), Some(999), Some(999), Some(456)]); assert_eq!(expected, result); @@ -4119,7 +4194,8 @@ mod tests { .downcast_ref::() .expect("failed to downcast to Int32Array"); - let expected = &Int32Array::from(vec![Some(123), Some(999), Some(999), Some(456)]); + let expected = + &Int32Array::from(vec![Some(123), Some(999), Some(999), Some(456)]); assert_eq!(expected, result); diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index bc6b26069ad..e17f40091ad 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -647,11 +647,10 @@ impl DefaultPhysicalPlanner { expressions::in_list(value_expr, list_exprs, negated) } }, - Expr::Extract{date_part, expr} => { - let value_expr = self.create_physical_expr(expr, input_schema, ctx_state)?; - let value_expr_data_type = value_expr.data_type(input_schema)?; - - todo!("") + Expr::Extract { date_part, expr } => { + let value_expr = + self.create_physical_expr(expr, input_schema, ctx_state)?; + expressions::extract(*date_part, value_expr) } other => Err(DataFusionError::NotImplemented(format!( "Physical plan does not support logical expression {:?}", diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 7b4425efd8f..6c169b82329 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -350,7 +350,8 @@ async fn csv_query_group_by_float32() -> Result<()> { let mut ctx = ExecutionContext::new(); register_aggregate_simple_csv(&mut ctx)?; - let sql = "SELECT COUNT(*) as cnt, c1 FROM aggregate_simple GROUP BY c1 ORDER BY cnt DESC"; + let sql = + "SELECT COUNT(*) as cnt, c1 FROM aggregate_simple GROUP BY c1 ORDER BY cnt DESC"; let actual = execute(&mut ctx, sql).await; let expected = vec![ @@ -370,7 +371,8 @@ async fn csv_query_group_by_float64() -> Result<()> { let mut ctx = ExecutionContext::new(); register_aggregate_simple_csv(&mut ctx)?; - let sql = "SELECT COUNT(*) as cnt, c2 FROM aggregate_simple GROUP BY c2 ORDER BY cnt DESC"; + let sql = + "SELECT COUNT(*) as cnt, c2 FROM aggregate_simple GROUP BY c2 ORDER BY cnt DESC"; let actual = execute(&mut ctx, sql).await; let expected = vec![ @@ -390,7 +392,8 @@ async fn csv_query_group_by_boolean() -> Result<()> { let mut ctx = ExecutionContext::new(); register_aggregate_simple_csv(&mut ctx)?; - let sql = "SELECT COUNT(*) as cnt, c3 FROM aggregate_simple GROUP BY c3 ORDER BY cnt DESC"; + let sql = + "SELECT COUNT(*) as cnt, c3 FROM aggregate_simple GROUP BY c3 ORDER BY cnt DESC"; let actual = execute(&mut ctx, sql).await; let expected = vec![vec!["9", "true"], vec!["6", "false"]]; @@ -1003,7 +1006,8 @@ async fn csv_query_external_table_sum() { let mut ctx = ExecutionContext::new(); // cast smallint and int to bigint to avoid overflow during calculation register_aggregate_csv_by_sql(&mut ctx).await; - let sql = "SELECT SUM(CAST(c7 AS BIGINT)), SUM(CAST(c8 AS BIGINT)) FROM aggregate_test_100"; + let sql = + "SELECT SUM(CAST(c7 AS BIGINT)), SUM(CAST(c8 AS BIGINT)) FROM aggregate_test_100"; let actual = execute(&mut ctx, sql).await; let expected = vec![vec!["13060", "3017641"]]; assert_eq!(expected, actual); @@ -1105,7 +1109,8 @@ fn create_case_context() -> Result { #[tokio::test] async fn equijoin() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?; - let sql = "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id ORDER BY t1_id"; + let sql = + "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id ORDER BY t1_id"; let actual = execute(&mut ctx, sql).await; let expected = vec![ vec!["11", "a", "z"], @@ -1165,7 +1170,8 @@ async fn left_join_using() -> Result<()> { #[tokio::test] async fn equijoin_implicit_syntax() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?; - let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t1_id = t2_id ORDER BY t1_id"; + let sql = + "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t1_id = t2_id ORDER BY t1_id"; let actual = execute(&mut ctx, sql).await; let expected = vec![ vec!["11", "a", "z"], @@ -1198,7 +1204,8 @@ async fn equijoin_implicit_syntax_with_filter() -> Result<()> { #[tokio::test] async fn equijoin_implicit_syntax_reversed() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?; - let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t2_id = t1_id ORDER BY t1_id"; + let sql = + "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t2_id = t1_id ORDER BY t1_id"; let actual = execute(&mut ctx, sql).await; let expected = vec![ vec!["11", "a", "z"], @@ -1221,7 +1228,10 @@ async fn cartesian_join() -> Result<()> { Ok(()) } -fn create_join_context(column_left: &str, column_right: &str) -> Result { +fn create_join_context( + column_left: &str, + column_right: &str, +) -> Result { let mut ctx = ExecutionContext::new(); let t1_schema = Arc::new(Schema::new(vec![ @@ -1753,7 +1763,8 @@ async fn query_count_distinct() -> Result<()> { async fn query_on_string_dictionary() -> Result<()> { // Test to ensure DataFusion can operate on dictionary types // Use StringDictionary (32 bit indexes = keys) - let field_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + let field_type = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); let schema = Arc::new(Schema::new(vec![Field::new("d1", field_type, true)])); let keys_builder = PrimitiveBuilder::::new(10); @@ -1909,7 +1920,8 @@ async fn string_expressions() -> Result<()> { let actual = execute(&mut ctx, sql).await; let expected = vec![vec![ - "3", "NULL", "3", "NULL", "tom", "NULL", "TOM", "NULL", "tom", "NULL", "tom ", " tom", + "3", "NULL", "3", "NULL", "tom", "NULL", "TOM", "NULL", "tom", "NULL", "tom ", + " tom", ]]; assert_eq!(expected, actual); Ok(()) @@ -2041,10 +2053,11 @@ async fn in_list_scalar() -> Result<()> { let actual = execute(&mut ctx, sql).await; let expected = vec![vec![ - "true", "false", "true", "false", "NULL", "NULL", "true", "NULL", "false", "NULL", "true", - "false", "true", "false", "NULL", "NULL", "true", "NULL", "false", "NULL", "true", "false", - "true", "false", "NULL", "NULL", "true", "NULL", "false", "NULL", "true", "false", "true", - "false", "NULL", "NULL", "true", "NULL", "false", "NULL", + "true", "false", "true", "false", "NULL", "NULL", "true", "NULL", "false", + "NULL", "true", "false", "true", "false", "NULL", "NULL", "true", "NULL", + "false", "NULL", "true", "false", "true", "false", "NULL", "NULL", "true", + "NULL", "false", "NULL", "true", "false", "true", "false", "NULL", "NULL", + "true", "NULL", "false", "NULL", ]]; assert_eq!(expected, actual); Ok(()) From a471f57db5b85d0b7a49acc9e9d9671c807d43a0 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 29 Jan 2021 16:31:50 +0100 Subject: [PATCH 07/24] Add support for timestamps without time zone --- rust/datafusion/src/logical_plan/expr.rs | 19 +++++++------ .../src/physical_plan/expressions.rs | 28 +++++++++++++++++-- rust/datafusion/src/physical_plan/planner.rs | 1 + rust/datafusion/tests/sql.rs | 5 ++-- 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/rust/datafusion/src/logical_plan/expr.rs b/rust/datafusion/src/logical_plan/expr.rs index 5d69d536881..a2d3ce609e1 100644 --- a/rust/datafusion/src/logical_plan/expr.rs +++ b/rust/datafusion/src/logical_plan/expr.rs @@ -254,10 +254,14 @@ impl Expr { DataType::Date32(_) | DataType::Date64(_) | DataType::Time32(_) - | DataType::Time64(_) => Ok(DataType::Int32), - _ => Err(DataFusionError::Internal( - "Only Date and Time datatypes supported in Extract".to_owned(), - )), + | DataType::Time64(_) + | DataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None) => { + Ok(DataType::Int32) + } + dt => Err(DataFusionError::Internal(format!( + "Only Date and Time datatypes supported in Extract, got {}", + dt + ))), } } } @@ -1015,11 +1019,8 @@ impl fmt::Debug for Expr { } } Expr::Wildcard => write!(f, "*"), - Expr::Extract { - date_part: DatePart::Hour, - expr, - } => { - write!(f, "EXTRACT(HOUR FROM {:?})", expr) + Expr::Extract { date_part, expr } => { + write!(f, "EXTRACT({} FROM {:?})", date_part, expr) } } } diff --git a/rust/datafusion/src/physical_plan/expressions.rs b/rust/datafusion/src/physical_plan/expressions.rs index 3cb07bb8f47..e4ed7213210 100644 --- a/rust/datafusion/src/physical_plan/expressions.rs +++ b/rust/datafusion/src/physical_plan/expressions.rs @@ -2592,14 +2592,36 @@ impl PhysicalExpr for Extract { fn evaluate(&self, batch: &RecordBatch) -> Result { let value = self.expr.evaluate(batch)?; + let data_type = value.data_type(); let array = match value { ColumnarValue::Array(array) => array, ColumnarValue::Scalar(scalar) => scalar.to_array(), }; - let array = array.as_any().downcast_ref::().unwrap(); - - Ok(ColumnarValue::Array(Arc::new(hour(array)?))) + match data_type { + DataType::Date32(_) => { + let array = array.as_any().downcast_ref::().unwrap(); + Ok(ColumnarValue::Array(Arc::new(hour(array)?))) + } + DataType::Date64(_) => { + let array = array.as_any().downcast_ref::().unwrap(); + Ok(ColumnarValue::Array(Arc::new(hour(array)?))) + } + DataType::Timestamp(TimeUnit::Nanosecond, None) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + Ok(ColumnarValue::Array(Arc::new(hour(array)?))) + } + d => { + return Err(DataFusionError::Internal(format!( + "Extract does not support datatype {:?}", + d + ))) + } // DataType::Time32(_) => {} + // DataType::Time64(_) => {} + } } } diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index e17f40091ad..6702f938378 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -650,6 +650,7 @@ impl DefaultPhysicalPlanner { Expr::Extract { date_part, expr } => { let value_expr = self.create_physical_expr(expr, input_schema, ctx_state)?; + expressions::extract(*date_part, value_expr) } other => Err(DataFusionError::NotImplemented(format!( diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 6c169b82329..d08fb0742ef 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -1968,11 +1968,12 @@ async fn crypto_expressions() -> Result<()> { async fn extract_date_part() -> Result<()> { let mut ctx = ExecutionContext::new(); let sql = "SELECT - EXTRACT(HOUR FROM CAST('2020-01-01' AS date)) + EXTRACT(HOUR FROM CAST('2020-01-01' AS date)), + EXTRACT(HOUR FROM to_timestamp('2020-09-08T12:00:00')) "; let actual = execute(&mut ctx, sql).await; - let expected = vec![vec!["0"]]; + let expected = vec![vec!["0", "12"]]; assert_eq!(expected, actual); Ok(()) } From 5de73f679dd26152de7c4ed99eb5aa32c34a557b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 29 Jan 2021 16:48:20 +0100 Subject: [PATCH 08/24] Small test changes --- rust/datafusion/tests/sql.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index d08fb0742ef..765dbbe4942 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -1672,7 +1672,7 @@ fn make_timestamp_nano_table() -> Result> { } #[tokio::test] -async fn to_timstamp() -> Result<()> { +async fn to_timestamp() -> Result<()> { let mut ctx = ExecutionContext::new(); ctx.register_table("ts_data", make_timestamp_nano_table()?); @@ -1968,8 +1968,8 @@ async fn crypto_expressions() -> Result<()> { async fn extract_date_part() -> Result<()> { let mut ctx = ExecutionContext::new(); let sql = "SELECT - EXTRACT(HOUR FROM CAST('2020-01-01' AS date)), - EXTRACT(HOUR FROM to_timestamp('2020-09-08T12:00:00')) + EXTRACT(HOUR FROM CAST('2020-01-01' AS DATE)), + EXTRACT(HOUR FROM to_timestamp('2020-09-08T12:00:00+00:00')) "; let actual = execute(&mut ctx, sql).await; From 7ce35ab09df4fd43231cdc7d165d5fd6fc6cc49d Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 29 Jan 2021 17:31:37 +0100 Subject: [PATCH 09/24] Clippy --- rust/datafusion/src/physical_plan/expressions.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/rust/datafusion/src/physical_plan/expressions.rs b/rust/datafusion/src/physical_plan/expressions.rs index e4ed7213210..e047dc794cc 100644 --- a/rust/datafusion/src/physical_plan/expressions.rs +++ b/rust/datafusion/src/physical_plan/expressions.rs @@ -2614,13 +2614,10 @@ impl PhysicalExpr for Extract { .unwrap(); Ok(ColumnarValue::Array(Arc::new(hour(array)?))) } - d => { - return Err(DataFusionError::Internal(format!( - "Extract does not support datatype {:?}", - d - ))) - } // DataType::Time32(_) => {} - // DataType::Time64(_) => {} + datatype => Err(DataFusionError::Internal(format!( + "Extract does not support datatype {:?}", + datatype + ))), } } } From 1e96898a7a33023718569d31909a81348b278e5e Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 29 Jan 2021 17:35:17 +0100 Subject: [PATCH 10/24] Remove remaining todo!() --- rust/datafusion/src/physical_plan/expressions.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/expressions.rs b/rust/datafusion/src/physical_plan/expressions.rs index e047dc794cc..5929eb38dbe 100644 --- a/rust/datafusion/src/physical_plan/expressions.rs +++ b/rust/datafusion/src/physical_plan/expressions.rs @@ -2579,7 +2579,7 @@ impl Extract { impl PhysicalExpr for Extract { fn as_any(&self) -> &dyn Any { - todo!() + self } fn data_type(&self, _input_schema: &Schema) -> Result { From bda3cabfd6b78ba2b4bc84e1fe0a3a5a7dedec62 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 29 Jan 2021 17:38:11 +0100 Subject: [PATCH 11/24] Improve naming --- rust/datafusion/src/physical_plan/planner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index 6702f938378..50339ae25db 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -648,10 +648,10 @@ impl DefaultPhysicalPlanner { } }, Expr::Extract { date_part, expr } => { - let value_expr = + let physical_expr = self.create_physical_expr(expr, input_schema, ctx_state)?; - expressions::extract(*date_part, value_expr) + expressions::extract(*date_part, physical_expr) } other => Err(DataFusionError::NotImplemented(format!( "Physical plan does not support logical expression {:?}", From 0cad90f0bbec70c411c24f5985af57b1d63edb59 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 29 Jan 2021 17:40:43 +0100 Subject: [PATCH 12/24] Undo whitespace changes --- rust/datafusion/tests/sql.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 765dbbe4942..8afff889a0c 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -710,8 +710,7 @@ async fn csv_query_cast() -> Result<()> { async fn csv_query_cast_literal() -> Result<()> { let mut ctx = ExecutionContext::new(); register_aggregate_csv(&mut ctx)?; - let sql = - "SELECT c12, CAST(1 AS float) FROM aggregate_test_100 WHERE c12 > CAST(0 AS float) LIMIT 2"; + let sql = "SELECT c12, CAST(1 AS float) FROM aggregate_test_100 WHERE c12 > CAST(0 AS float) LIMIT 2"; let actual = execute(&mut ctx, sql).await; let expected = vec![ vec!["0.9294097332465232", "1"], @@ -1124,7 +1123,8 @@ async fn equijoin() -> Result<()> { #[tokio::test] async fn left_join() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?; - let sql = "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id ORDER BY t1_id"; + let sql = + "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id ORDER BY t1_id"; let actual = execute(&mut ctx, sql).await; let expected = vec![ vec!["11", "a", "z"], From ac62940cde402918f04b062555c5e397904a17e3 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 30 Jan 2021 11:25:47 +0100 Subject: [PATCH 13/24] Comment fixes --- rust/datafusion/src/logical_plan/expr.rs | 2 +- rust/datafusion/src/physical_plan/expressions/extract.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/logical_plan/expr.rs b/rust/datafusion/src/logical_plan/expr.rs index 3e88a9c779a..a9cba439a08 100644 --- a/rust/datafusion/src/logical_plan/expr.rs +++ b/rust/datafusion/src/logical_plan/expr.rs @@ -174,7 +174,7 @@ pub enum Expr { Extract { /// The date part, e.g. `DatePart::Hour` date_part: DatePart, - /// The expression being cast + /// The expression to extract the date part from expr: Box, }, } diff --git a/rust/datafusion/src/physical_plan/expressions/extract.rs b/rust/datafusion/src/physical_plan/expressions/extract.rs index b46f7eb3ee3..7cca219d60b 100644 --- a/rust/datafusion/src/physical_plan/expressions/extract.rs +++ b/rust/datafusion/src/physical_plan/expressions/extract.rs @@ -21,7 +21,7 @@ impl fmt::Display for Extract { } } -/// InList +/// Extract #[derive(Debug)] pub struct Extract { date_part: DatePart, From dc679992a0003425696a3cf4f05b95cc11bfda99 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 30 Jan 2021 14:38:02 +0100 Subject: [PATCH 14/24] Support all time units --- .../src/physical_plan/expressions/extract.rs | 42 +++++++++++++++---- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/rust/datafusion/src/physical_plan/expressions/extract.rs b/rust/datafusion/src/physical_plan/expressions/extract.rs index 7cca219d60b..203a8ef50b1 100644 --- a/rust/datafusion/src/physical_plan/expressions/extract.rs +++ b/rust/datafusion/src/physical_plan/expressions/extract.rs @@ -3,7 +3,10 @@ use core::fmt; use std::{any::Any, sync::Arc}; use arrow::{ - array::{Date32Array, Date64Array, TimestampNanosecondArray}, + array::{ + Date32Array, Date64Array, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, + }, compute::hour, datatypes::{DataType, Schema, TimeUnit}, record_batch::RecordBatch, @@ -65,13 +68,36 @@ impl PhysicalExpr for Extract { let array = array.as_any().downcast_ref::().unwrap(); Ok(ColumnarValue::Array(Arc::new(hour(array)?))) } - DataType::Timestamp(TimeUnit::Nanosecond, None) => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - Ok(ColumnarValue::Array(Arc::new(hour(array)?))) - } + DataType::Timestamp(time_unit, None) => match time_unit { + TimeUnit::Second => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + Ok(ColumnarValue::Array(Arc::new(hour(array)?))) + } + TimeUnit::Millisecond => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + Ok(ColumnarValue::Array(Arc::new(hour(array)?))) + } + TimeUnit::Microsecond => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + Ok(ColumnarValue::Array(Arc::new(hour(array)?))) + } + TimeUnit::Nanosecond => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + Ok(ColumnarValue::Array(Arc::new(hour(array)?))) + } + }, datatype => Err(DataFusionError::Internal(format!( "Extract does not support datatype {:?}", datatype From b4efb645ddd8334bfe251deeb88a8c8372d6bbb2 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 17 Feb 2021 20:21:01 +0100 Subject: [PATCH 15/24] WIP date_part --- rust/datafusion/src/logical_plan/expr.rs | 40 ------------------- rust/datafusion/src/optimizer/utils.rs | 6 --- .../src/physical_plan/datetime_expressions.rs | 32 +++++++++++++++ .../src/physical_plan/expressions/extract.rs | 10 +---- .../datafusion/src/physical_plan/functions.rs | 2 + rust/datafusion/src/physical_plan/planner.rs | 6 --- rust/datafusion/src/sql/planner.rs | 9 +++-- rust/datafusion/src/sql/utils.rs | 4 -- 8 files changed, 41 insertions(+), 68 deletions(-) diff --git a/rust/datafusion/src/logical_plan/expr.rs b/rust/datafusion/src/logical_plan/expr.rs index 712b0e1e2aa..99ac3d4b9f2 100644 --- a/rust/datafusion/src/logical_plan/expr.rs +++ b/rust/datafusion/src/logical_plan/expr.rs @@ -17,7 +17,6 @@ //! This module provides an `Expr` enum for representing expressions such as `col = 5` or `SUM(col)` -use super::temporal::DatePart; pub use super::Operator; use std::fmt; @@ -170,13 +169,6 @@ pub enum Expr { }, /// Represents a reference to all fields in a schema. Wildcard, - /// Extract date parts (day, hour, minute) from a date / time expression - Extract { - /// The date part, e.g. `DatePart::Hour` - date_part: DatePart, - /// The expression to extract the date part from - expr: Box, - }, } impl Expr { @@ -245,25 +237,6 @@ impl Expr { Expr::Wildcard => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), - Expr::Extract { - date_part: DatePart::Hour, - expr, - } => { - let inner = expr.get_type(schema)?; - match inner { - DataType::Date32 - | DataType::Date64 - | DataType::Time32(_) - | DataType::Time64(_) - | DataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None) => { - Ok(DataType::Int32) - } - dt => Err(DataFusionError::Internal(format!( - "Only Date and Time datatypes supported in Extract, got {}", - dt - ))), - } - } } } @@ -319,7 +292,6 @@ impl Expr { Expr::Wildcard => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), - Expr::Extract { date_part: _, expr } => expr.nullable(input_schema), } } @@ -552,7 +524,6 @@ impl Expr { .try_fold(visitor, |visitor, arg| arg.accept(visitor)) } Expr::Wildcard => Ok(visitor), - Expr::Extract { date_part: _, expr } => expr.accept(visitor), }?; visitor.post_visit(self) @@ -1039,9 +1010,6 @@ impl fmt::Debug for Expr { } } Expr::Wildcard => write!(f, "*"), - Expr::Extract { date_part, expr } => { - write!(f, "EXTRACT({} FROM {:?})", date_part, expr) - } } } } @@ -1146,14 +1114,6 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { Ok(format!("{} IN ({:?})", expr, list)) } } - Expr::Extract { - date_part: DatePart::Hour, - expr, - } => { - let expr = create_name(expr, input_schema)?; - - Ok(format!("EXTRACT(HOUR FROM {})", expr)) - } other => Err(DataFusionError::NotImplemented(format!( "Physical plan does not support logical expression {:?}", other diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs index 6110812903f..31d18b91801 100644 --- a/rust/datafusion/src/optimizer/utils.rs +++ b/rust/datafusion/src/optimizer/utils.rs @@ -80,7 +80,6 @@ impl ExpressionVisitor for ColumnNameVisitor<'_> { Expr::AggregateUDF { .. } => {} Expr::InList { .. } => {} Expr::Wildcard => {} - Expr::Extract { .. } => {} } Ok(Recursion::Continue(self)) } @@ -334,7 +333,6 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result> { Expr::Wildcard { .. } => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), - Expr::Extract { date_part: _, expr } => Ok(vec![expr.as_ref().to_owned()]), } } @@ -447,10 +445,6 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result { Expr::Wildcard { .. } => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), - Expr::Extract { date_part, expr: _ } => Ok(Expr::Extract { - date_part: *date_part, - expr: Box::new(expressions[0].clone()), - }), } } diff --git a/rust/datafusion/src/physical_plan/datetime_expressions.rs b/rust/datafusion/src/physical_plan/datetime_expressions.rs index 8642e3b40e3..ac7e2e0c743 100644 --- a/rust/datafusion/src/physical_plan/datetime_expressions.rs +++ b/rust/datafusion/src/physical_plan/datetime_expressions.rs @@ -344,6 +344,38 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result { }) } +/// DATE_PART SQL function +pub fn date_part(args: &[ColumnarValue]) -> Result { + if args.len() != 2 { + return Err(DataFusionError::Execution( + "Expected two arguments in DATE_PART".to_string(), + )); + } + let (date_part, array) = (&args[0], &args[1]); + + let date_part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = date_part { + v + } else { + return Err(DataFusionError::Execution( + "First argument of `DATE_PART` must be non-null scalar Utf8".to_string(), + )); + }; + + Ok(match array { + ColumnarValue::Array(array) => { + match date_part { + //"HOUR" => temporal::hour(array.as_any().downcast_ref::Pri) + _ => { + unimplemented!() + } + } + } + ColumnarValue::Scalar(array) => { + unimplemented!() + } + }) +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/rust/datafusion/src/physical_plan/expressions/extract.rs b/rust/datafusion/src/physical_plan/expressions/extract.rs index 203a8ef50b1..801cac757a7 100644 --- a/rust/datafusion/src/physical_plan/expressions/extract.rs +++ b/rust/datafusion/src/physical_plan/expressions/extract.rs @@ -104,12 +104,4 @@ impl PhysicalExpr for Extract { ))), } } -} - -/// Creates a expression Extract -pub fn extract( - date_part: DatePart, - expr: Arc, -) -> Result> { - Ok(Arc::new(Extract::new(date_part, expr))) -} +} \ No newline at end of file diff --git a/rust/datafusion/src/physical_plan/functions.rs b/rust/datafusion/src/physical_plan/functions.rs index c5cd01f93c5..155a76dcbc3 100644 --- a/rust/datafusion/src/physical_plan/functions.rs +++ b/rust/datafusion/src/physical_plan/functions.rs @@ -140,6 +140,8 @@ pub enum BuiltinScalarFunction { NullIf, /// Date truncate DateTrunc, + /// Date part + DatePart, /// MD5 MD5, /// SHA224 diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index 3ec0f9e51e3..3f2df33d8f8 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -661,12 +661,6 @@ impl DefaultPhysicalPlanner { expressions::in_list(value_expr, list_exprs, negated) } }, - Expr::Extract { date_part, expr } => { - let physical_expr = - self.create_physical_expr(expr, input_schema, ctx_state)?; - - expressions::extract(*date_part, physical_expr) - } other => Err(DataFusionError::NotImplemented(format!( "Physical plan does not support logical expression {:?}", other diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 7a0fa48bb5d..dafe143979b 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -730,9 +730,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { SQLExpr::Extract { field: DateTimeField::Hour, expr, - } => Ok(Expr::Extract { - date_part: DatePart::Hour, - expr: Box::new(self.sql_expr_to_logical_expr(expr)?), + } => Ok(Expr::ScalarFunction { + fun: functions::BuiltinScalarFunction::DatePart, + args: vec![ + Expr::Literal(ScalarValue::Utf8(Some("hour".to_string()))), + self.sql_expr_to_logical_expr(expr)?, + ], }), SQLExpr::Value(Value::Interval { diff --git a/rust/datafusion/src/sql/utils.rs b/rust/datafusion/src/sql/utils.rs index 7f66552db3e..ffcb772cd0e 100644 --- a/rust/datafusion/src/sql/utils.rs +++ b/rust/datafusion/src/sql/utils.rs @@ -333,10 +333,6 @@ where Ok(expr.clone()) } Expr::Wildcard => Ok(Expr::Wildcard), - Expr::Extract { date_part, expr: e } => Ok(Expr::Extract { - date_part: *date_part, - expr: Box::new(clone_with_replacement(e, replacement_fn)?), - }), }, } } From 4c0dac6c30ef447d918f6e4e7ab3a7294179f19e Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 17 Feb 2021 20:48:03 +0100 Subject: [PATCH 16/24] WIP date_part 2 --- rust/datafusion/src/logical_plan/temporal.rs | 18 --- .../src/physical_plan/datetime_expressions.rs | 71 +++++++++--- .../src/physical_plan/expressions/extract.rs | 107 ------------------ .../src/physical_plan/expressions/mod.rs | 2 - .../datafusion/src/physical_plan/functions.rs | 7 ++ rust/datafusion/src/sql/planner.rs | 1 - rust/datafusion/tests/sql.rs | 6 +- 7 files changed, 67 insertions(+), 145 deletions(-) delete mode 100644 rust/datafusion/src/logical_plan/temporal.rs delete mode 100644 rust/datafusion/src/physical_plan/expressions/extract.rs diff --git a/rust/datafusion/src/logical_plan/temporal.rs b/rust/datafusion/src/logical_plan/temporal.rs deleted file mode 100644 index cf533c38121..00000000000 --- a/rust/datafusion/src/logical_plan/temporal.rs +++ /dev/null @@ -1,18 +0,0 @@ -use core::fmt; - -#[derive(Clone, Copy, Debug, PartialEq)] -/// Date part -pub enum DatePart { - /// Hour part of date / time - Hour, -} - -impl fmt::Display for DatePart { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - DatePart::Hour => { - write!(f, "HOUR") - } - } - } -} diff --git a/rust/datafusion/src/physical_plan/datetime_expressions.rs b/rust/datafusion/src/physical_plan/datetime_expressions.rs index ac7e2e0c743..5ebb3666009 100644 --- a/rust/datafusion/src/physical_plan/datetime_expressions.rs +++ b/rust/datafusion/src/physical_plan/datetime_expressions.rs @@ -23,13 +23,18 @@ use crate::{ error::{DataFusionError, Result}, scalar::{ScalarType, ScalarValue}, }; -use arrow::temporal_conversions::timestamp_ns_to_datetime; +use arrow::{ + array::{Array, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait}, + datatypes::{ArrowPrimitiveType, DataType, TimestampNanosecondType}, +}; use arrow::{ array::{ - Array, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait, - TimestampNanosecondArray, + Date32Array, Date64Array, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, }, - datatypes::{ArrowPrimitiveType, DataType, TimestampNanosecondType}, + compute::kernels::temporal, + datatypes::TimeUnit, + temporal_conversions::timestamp_ns_to_datetime, }; use chrono::prelude::*; use chrono::Duration; @@ -361,19 +366,55 @@ pub fn date_part(args: &[ColumnarValue]) -> Result { )); }; - Ok(match array { - ColumnarValue::Array(array) => { - match date_part { - //"HOUR" => temporal::hour(array.as_any().downcast_ref::Pri) - _ => { - unimplemented!() - } - } + let array = match array { + ColumnarValue::Array(array) => array.clone(), + ColumnarValue::Scalar(scalar) => scalar.to_array(), + }; + + match array.data_type() { + DataType::Date32 => { + let array = array.as_any().downcast_ref::().unwrap(); + Ok(ColumnarValue::Array(Arc::new(temporal::hour(array)?))) } - ColumnarValue::Scalar(array) => { - unimplemented!() + DataType::Date64 => { + let array = array.as_any().downcast_ref::().unwrap(); + Ok(ColumnarValue::Array(Arc::new(temporal::hour(array)?))) } - }) + DataType::Timestamp(time_unit, None) => match time_unit { + TimeUnit::Second => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + Ok(ColumnarValue::Array(Arc::new(temporal::hour(array)?))) + } + TimeUnit::Millisecond => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + Ok(ColumnarValue::Array(Arc::new(temporal::hour(array)?))) + } + TimeUnit::Microsecond => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + Ok(ColumnarValue::Array(Arc::new(temporal::hour(array)?))) + } + TimeUnit::Nanosecond => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + Ok(ColumnarValue::Array(Arc::new(temporal::hour(array)?))) + } + }, + datatype => Err(DataFusionError::Internal(format!( + "Extract does not support datatype {:?}", + datatype + ))), + } } #[cfg(test)] diff --git a/rust/datafusion/src/physical_plan/expressions/extract.rs b/rust/datafusion/src/physical_plan/expressions/extract.rs deleted file mode 100644 index 801cac757a7..00000000000 --- a/rust/datafusion/src/physical_plan/expressions/extract.rs +++ /dev/null @@ -1,107 +0,0 @@ -use crate::error::Result; -use core::fmt; -use std::{any::Any, sync::Arc}; - -use arrow::{ - array::{ - Date32Array, Date64Array, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, - }, - compute::hour, - datatypes::{DataType, Schema, TimeUnit}, - record_batch::RecordBatch, -}; - -use crate::{ - error::DataFusionError, - logical_plan::DatePart, - physical_plan::{ColumnarValue, PhysicalExpr}, -}; - -impl fmt::Display for Extract { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "EXTRACT({} AS {:?})", self.date_part, self.expr) - } -} - -/// Extract -#[derive(Debug)] -pub struct Extract { - date_part: DatePart, - expr: Arc, -} - -impl Extract { - /// Create new Extract expression - pub fn new(date_part: DatePart, expr: Arc) -> Self { - Self { date_part, expr } - } -} - -impl PhysicalExpr for Extract { - fn as_any(&self) -> &dyn Any { - self - } - - fn data_type(&self, _input_schema: &Schema) -> Result { - Ok(DataType::Int32) - } - - fn nullable(&self, input_schema: &Schema) -> Result { - self.expr.nullable(input_schema) - } - - fn evaluate(&self, batch: &RecordBatch) -> Result { - let value = self.expr.evaluate(batch)?; - let data_type = value.data_type(); - let array = match value { - ColumnarValue::Array(array) => array, - ColumnarValue::Scalar(scalar) => scalar.to_array(), - }; - - match data_type { - DataType::Date32 => { - let array = array.as_any().downcast_ref::().unwrap(); - Ok(ColumnarValue::Array(Arc::new(hour(array)?))) - } - DataType::Date64 => { - let array = array.as_any().downcast_ref::().unwrap(); - Ok(ColumnarValue::Array(Arc::new(hour(array)?))) - } - DataType::Timestamp(time_unit, None) => match time_unit { - TimeUnit::Second => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - Ok(ColumnarValue::Array(Arc::new(hour(array)?))) - } - TimeUnit::Millisecond => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - Ok(ColumnarValue::Array(Arc::new(hour(array)?))) - } - TimeUnit::Microsecond => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - Ok(ColumnarValue::Array(Arc::new(hour(array)?))) - } - TimeUnit::Nanosecond => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - Ok(ColumnarValue::Array(Arc::new(hour(array)?))) - } - }, - datatype => Err(DataFusionError::Internal(format!( - "Extract does not support datatype {:?}", - datatype - ))), - } - } -} \ No newline at end of file diff --git a/rust/datafusion/src/physical_plan/expressions/mod.rs b/rust/datafusion/src/physical_plan/expressions/mod.rs index a346d47f521..fe5fea1e2e4 100644 --- a/rust/datafusion/src/physical_plan/expressions/mod.rs +++ b/rust/datafusion/src/physical_plan/expressions/mod.rs @@ -33,7 +33,6 @@ mod cast; mod coercion; mod column; mod count; -mod extract; mod in_list; mod is_not_null; mod is_null; @@ -50,7 +49,6 @@ pub use case::{case, CaseExpr}; pub use cast::{cast, CastExpr}; pub use column::{col, Column}; pub use count::Count; -pub use extract::{extract, Extract}; pub use in_list::{in_list, InListExpr}; pub use is_not_null::{is_not_null, IsNotNullExpr}; pub use is_null::{is_null, IsNullExpr}; diff --git a/rust/datafusion/src/physical_plan/functions.rs b/rust/datafusion/src/physical_plan/functions.rs index 155a76dcbc3..50f4c5363b4 100644 --- a/rust/datafusion/src/physical_plan/functions.rs +++ b/rust/datafusion/src/physical_plan/functions.rs @@ -193,6 +193,7 @@ impl FromStr for BuiltinScalarFunction { "upper" => BuiltinScalarFunction::Upper, "to_timestamp" => BuiltinScalarFunction::ToTimestamp, "date_trunc" => BuiltinScalarFunction::DateTrunc, + "date_part" => BuiltinScalarFunction::DatePart, "array" => BuiltinScalarFunction::Array, "nullif" => BuiltinScalarFunction::NullIf, "md5" => BuiltinScalarFunction::MD5, @@ -300,6 +301,7 @@ pub fn return_type( BuiltinScalarFunction::DateTrunc => { Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)) } + BuiltinScalarFunction::DatePart => Ok(DataType::Int32), BuiltinScalarFunction::Array => Ok(DataType::FixedSizeList( Box::new(Field::new("item", arg_types[0].clone(), true)), arg_types.len() as i32, @@ -414,6 +416,7 @@ pub fn create_physical_expr( BuiltinScalarFunction::Upper => string_expressions::upper, BuiltinScalarFunction::ToTimestamp => datetime_expressions::to_timestamp, BuiltinScalarFunction::DateTrunc => datetime_expressions::date_trunc, + BuiltinScalarFunction::DatePart => datetime_expressions::date_part, BuiltinScalarFunction::Array => array_expressions::array, }); // coerce @@ -457,6 +460,10 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature { DataType::Utf8, DataType::Timestamp(TimeUnit::Nanosecond, None), ]), + BuiltinScalarFunction::DatePart => Signature::Exact(vec![ + DataType::Utf8, + DataType::Timestamp(TimeUnit::Nanosecond, None), + ]), BuiltinScalarFunction::Array => { Signature::Variadic(array_expressions::SUPPORTED_ARRAY_TYPES.to_vec()) } diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index dafe143979b..3a80b1df3af 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -21,7 +21,6 @@ use std::str::FromStr; use std::sync::Arc; use crate::datasource::TableProvider; -use crate::logical_plan::DatePart; use crate::logical_plan::Expr::Alias; use crate::logical_plan::{ and, lit, DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 3ffa78e2d57..183b9101b06 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -2138,9 +2138,11 @@ async fn crypto_expressions() -> Result<()> { async fn extract_date_part() -> Result<()> { let mut ctx = ExecutionContext::new(); let sql = "SELECT - EXTRACT(HOUR FROM CAST('2020-01-01' AS DATE)), - EXTRACT(HOUR FROM to_timestamp('2020-09-08T12:00:00+00:00')) + date_part('hour', CAST('2020-01-01' AS DATE)) "; + // EXTRACT(HOUR FROM CAST('2020-01-01' AS DATE)), + //EXTRACT(HOUR FROM to_timestamp('2020-09-08T12:00:00+00:00')) + let actual = execute(&mut ctx, sql).await; let expected = vec![vec!["0", "12"]]; From 63119179f3acc61b81984a7e153a99dc37686746 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 17 Feb 2021 20:56:15 +0100 Subject: [PATCH 17/24] WIP --- rust/datafusion/src/logical_plan/mod.rs | 4 +--- rust/datafusion/src/physical_plan/functions.rs | 8 ++++---- rust/datafusion/tests/sql.rs | 3 +-- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/rust/datafusion/src/logical_plan/mod.rs b/rust/datafusion/src/logical_plan/mod.rs index 6b6b64f1c1c..6335552924d 100644 --- a/rust/datafusion/src/logical_plan/mod.rs +++ b/rust/datafusion/src/logical_plan/mod.rs @@ -29,7 +29,6 @@ mod extension; mod operators; mod plan; mod registry; -mod temporal; pub use builder::LogicalPlanBuilder; pub use dfschema::{DFField, DFSchema, DFSchemaRef, ToDFSchema}; pub use display::display_schema; @@ -45,5 +44,4 @@ pub use operators::Operator; pub use plan::{ JoinType, LogicalPlan, Partitioning, PlanType, PlanVisitor, StringifiedPlan, }; -pub use registry::FunctionRegistry; -pub use temporal::DatePart; +pub use registry::FunctionRegistry; \ No newline at end of file diff --git a/rust/datafusion/src/physical_plan/functions.rs b/rust/datafusion/src/physical_plan/functions.rs index 50f4c5363b4..05ccbddd3f0 100644 --- a/rust/datafusion/src/physical_plan/functions.rs +++ b/rust/datafusion/src/physical_plan/functions.rs @@ -460,10 +460,10 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature { DataType::Utf8, DataType::Timestamp(TimeUnit::Nanosecond, None), ]), - BuiltinScalarFunction::DatePart => Signature::Exact(vec![ - DataType::Utf8, - DataType::Timestamp(TimeUnit::Nanosecond, None), - ]), + BuiltinScalarFunction::DatePart => { + // TODO Utf8 -> Date32, Date64, timestamp etc. + Signature::Variadic(vec![DataType::Utf8, DataType::Date32]) + } BuiltinScalarFunction::Array => { Signature::Variadic(array_expressions::SUPPORTED_ARRAY_TYPES.to_vec()) } diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 183b9101b06..eacfa2e4b10 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -2138,9 +2138,8 @@ async fn crypto_expressions() -> Result<()> { async fn extract_date_part() -> Result<()> { let mut ctx = ExecutionContext::new(); let sql = "SELECT - date_part('hour', CAST('2020-01-01' AS DATE)) + date_part('hour', CAST('2020-01-01' AS DATE)) a "; - // EXTRACT(HOUR FROM CAST('2020-01-01' AS DATE)), //EXTRACT(HOUR FROM to_timestamp('2020-09-08T12:00:00+00:00')) let actual = execute(&mut ctx, sql).await; From 9b97b478ea69619ae3cd41c5ac7cd3e64fe3f362 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 17 Feb 2021 20:59:35 +0100 Subject: [PATCH 18/24] WIP --- rust/datafusion/src/physical_plan/functions.rs | 2 +- rust/datafusion/tests/sql.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/functions.rs b/rust/datafusion/src/physical_plan/functions.rs index 05ccbddd3f0..6e482717d2c 100644 --- a/rust/datafusion/src/physical_plan/functions.rs +++ b/rust/datafusion/src/physical_plan/functions.rs @@ -462,7 +462,7 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature { ]), BuiltinScalarFunction::DatePart => { // TODO Utf8 -> Date32, Date64, timestamp etc. - Signature::Variadic(vec![DataType::Utf8, DataType::Date32]) + Signature::Exact(vec![DataType::Utf8, DataType::Date32]) } BuiltinScalarFunction::Array => { Signature::Variadic(array_expressions::SUPPORTED_ARRAY_TYPES.to_vec()) diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index eacfa2e4b10..ff602fecab1 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -2139,12 +2139,13 @@ async fn extract_date_part() -> Result<()> { let mut ctx = ExecutionContext::new(); let sql = "SELECT date_part('hour', CAST('2020-01-01' AS DATE)) a + EXTRACT(HOUR FROM CAST('2020-01-01' AS DATE)) b "; //EXTRACT(HOUR FROM to_timestamp('2020-09-08T12:00:00+00:00')) let actual = execute(&mut ctx, sql).await; - let expected = vec![vec!["0", "12"]]; + let expected = vec![vec!["0", "0"]]; assert_eq!(expected, actual); Ok(()) } From a52e0741728669a9872d084e526f301bdc5e36ad Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 17 Feb 2021 21:01:03 +0100 Subject: [PATCH 19/24] Test fix --- rust/datafusion/tests/sql.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index ff602fecab1..de26abc3698 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -2138,7 +2138,7 @@ async fn crypto_expressions() -> Result<()> { async fn extract_date_part() -> Result<()> { let mut ctx = ExecutionContext::new(); let sql = "SELECT - date_part('hour', CAST('2020-01-01' AS DATE)) a + date_part('hour', CAST('2020-01-01' AS DATE)) a, EXTRACT(HOUR FROM CAST('2020-01-01' AS DATE)) b "; //EXTRACT(HOUR FROM to_timestamp('2020-09-08T12:00:00+00:00')) From af8792fb5e4b4a6b74a18149a03ba3bed203d18b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 20 Feb 2021 11:09:15 +0100 Subject: [PATCH 20/24] Add support for more complex function types --- rust/datafusion/src/logical_plan/mod.rs | 2 +- .../datafusion/src/physical_plan/functions.rs | 24 +++++++++- .../src/physical_plan/type_coercion.rs | 48 ++++++++++++------- rust/datafusion/tests/sql.rs | 6 +-- 4 files changed, 58 insertions(+), 22 deletions(-) diff --git a/rust/datafusion/src/logical_plan/mod.rs b/rust/datafusion/src/logical_plan/mod.rs index 6335552924d..8344ecdaa4d 100644 --- a/rust/datafusion/src/logical_plan/mod.rs +++ b/rust/datafusion/src/logical_plan/mod.rs @@ -44,4 +44,4 @@ pub use operators::Operator; pub use plan::{ JoinType, LogicalPlan, Partitioning, PlanType, PlanVisitor, StringifiedPlan, }; -pub use registry::FunctionRegistry; \ No newline at end of file +pub use registry::FunctionRegistry; diff --git a/rust/datafusion/src/physical_plan/functions.rs b/rust/datafusion/src/physical_plan/functions.rs index 6e482717d2c..54252ab2e23 100644 --- a/rust/datafusion/src/physical_plan/functions.rs +++ b/rust/datafusion/src/physical_plan/functions.rs @@ -71,6 +71,8 @@ pub enum Signature { Exact(Vec), /// fixed number of arguments of arbitrary types Any(usize), + /// One of a list of signatures + OneOf(Vec), } /// Scalar function @@ -461,8 +463,26 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature { DataType::Timestamp(TimeUnit::Nanosecond, None), ]), BuiltinScalarFunction::DatePart => { - // TODO Utf8 -> Date32, Date64, timestamp etc. - Signature::Exact(vec![DataType::Utf8, DataType::Date32]) + Signature::OneOf(vec![ + Signature::Exact(vec![DataType::Utf8, DataType::Date32]), + Signature::Exact(vec![DataType::Utf8, DataType::Date64]), + Signature::Exact(vec![ + DataType::Utf8, + DataType::Timestamp(TimeUnit::Second, None), + ]), + Signature::Exact(vec![ + DataType::Utf8, + DataType::Timestamp(TimeUnit::Microsecond, None), + ]), + Signature::Exact(vec![ + DataType::Utf8, + DataType::Timestamp(TimeUnit::Millisecond, None), + ]), + Signature::Exact(vec![ + DataType::Utf8, + DataType::Timestamp(TimeUnit::Nanosecond, None), + ]), + ]) } BuiltinScalarFunction::Array => { Signature::Variadic(array_expressions::SUPPORTED_ARRAY_TYPES.to_vec()) diff --git a/rust/datafusion/src/physical_plan/type_coercion.rs b/rust/datafusion/src/physical_plan/type_coercion.rs index a84707a48df..139ce4804a2 100644 --- a/rust/datafusion/src/physical_plan/type_coercion.rs +++ b/rust/datafusion/src/physical_plan/type_coercion.rs @@ -29,7 +29,7 @@ //! i64. However, i64 -> i32 is never performed as there are i64 //! values which can not be represented by i32 values. -use std::sync::Arc; +use std::{sync::Arc, vec}; use arrow::datatypes::{DataType, Schema, TimeUnit}; @@ -68,6 +68,29 @@ pub fn data_types( current_types: &[DataType], signature: &Signature, ) -> Result> { + let valid_types = get_valid_types(signature, current_types)?; + + if valid_types.contains(¤t_types.to_owned()) { + return Ok(current_types.to_vec()); + } + + for valid_types in valid_types { + if let Some(types) = maybe_data_types(&valid_types, ¤t_types) { + return Ok(types); + } + } + + // none possible -> Error + Err(DataFusionError::Plan(format!( + "Coercion from {:?} to the signature {:?} failed.", + current_types, signature + ))) +} + +fn get_valid_types( + signature: &Signature, + current_types: &[DataType], +) -> Result>> { let valid_types = match signature { Signature::Variadic(valid_types) => valid_types .iter() @@ -95,23 +118,16 @@ pub fn data_types( } vec![(0..*number).map(|i| current_types[i].clone()).collect()] } - }; - - if valid_types.contains(¤t_types.to_owned()) { - return Ok(current_types.to_vec()); - } - - for valid_types in valid_types { - if let Some(types) = maybe_data_types(&valid_types, ¤t_types) { - return Ok(types); + Signature::OneOf(types) => { + let mut r = vec![]; + for s in types { + r.extend(get_valid_types(s, current_types)?); + } + r } - } + }; - // none possible -> Error - Err(DataFusionError::Plan(format!( - "Coercion from {:?} to the signature {:?} failed.", - current_types, signature - ))) + Ok(valid_types) } /// Try to coerce current_types into valid_types. diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index de26abc3698..a3e96dd726f 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -2139,13 +2139,13 @@ async fn extract_date_part() -> Result<()> { let mut ctx = ExecutionContext::new(); let sql = "SELECT date_part('hour', CAST('2020-01-01' AS DATE)) a, - EXTRACT(HOUR FROM CAST('2020-01-01' AS DATE)) b + EXTRACT(HOUR FROM CAST('2020-01-01' AS DATE)) b, + EXTRACT(HOUR FROM to_timestamp('2020-09-08T12:00:00+00:00')) c "; - //EXTRACT(HOUR FROM to_timestamp('2020-09-08T12:00:00+00:00')) let actual = execute(&mut ctx, sql).await; - let expected = vec![vec!["0", "0"]]; + let expected = vec![vec!["0", "0", "12"]]; assert_eq!(expected, actual); Ok(()) } From e538726910f32fc48fe837fd1cedc67a0f5da100 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 20 Feb 2021 11:44:31 +0100 Subject: [PATCH 21/24] Support both year and hour based on argument --- .../src/physical_plan/datetime_expressions.rs | 101 ++++++++++-------- rust/datafusion/src/sql/planner.rs | 7 +- rust/datafusion/tests/sql.rs | 10 +- 3 files changed, 64 insertions(+), 54 deletions(-) diff --git a/rust/datafusion/src/physical_plan/datetime_expressions.rs b/rust/datafusion/src/physical_plan/datetime_expressions.rs index 5ebb3666009..cf6b2ddaf96 100644 --- a/rust/datafusion/src/physical_plan/datetime_expressions.rs +++ b/rust/datafusion/src/physical_plan/datetime_expressions.rs @@ -16,9 +16,9 @@ // under the License. //! DateTime expressions - use std::sync::Arc; +use super::ColumnarValue; use crate::{ error::{DataFusionError, Result}, scalar::{ScalarType, ScalarValue}, @@ -40,8 +40,6 @@ use chrono::prelude::*; use chrono::Duration; use chrono::LocalResult; -use super::ColumnarValue; - #[inline] /// Accepts a string in RFC3339 / ISO8601 standard format and some /// variants and converts it to a nanosecond precision timestamp. @@ -349,6 +347,55 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result { }) } +macro_rules! extract_date_part { + ($ARRAY: expr, $FN:expr) => { + match $ARRAY.data_type() { + DataType::Date32 => { + let array = $ARRAY.as_any().downcast_ref::().unwrap(); + Ok(ColumnarValue::Array(Arc::new($FN(array)?))) + } + DataType::Date64 => { + let array = $ARRAY.as_any().downcast_ref::().unwrap(); + Ok(ColumnarValue::Array(Arc::new($FN(array)?))) + } + DataType::Timestamp(time_unit, None) => match time_unit { + TimeUnit::Second => { + let array = $ARRAY + .as_any() + .downcast_ref::() + .unwrap(); + Ok(ColumnarValue::Array(Arc::new($FN(array)?))) + } + TimeUnit::Millisecond => { + let array = $ARRAY + .as_any() + .downcast_ref::() + .unwrap(); + Ok(ColumnarValue::Array(Arc::new($FN(array)?))) + } + TimeUnit::Microsecond => { + let array = $ARRAY + .as_any() + .downcast_ref::() + .unwrap(); + Ok(ColumnarValue::Array(Arc::new($FN(array)?))) + } + TimeUnit::Nanosecond => { + let array = $ARRAY + .as_any() + .downcast_ref::() + .unwrap(); + Ok(ColumnarValue::Array(Arc::new($FN(array)?))) + } + }, + datatype => Err(DataFusionError::Internal(format!( + "Extract does not support datatype {:?}", + datatype + ))), + } + }; +} + /// DATE_PART SQL function pub fn date_part(args: &[ColumnarValue]) -> Result { if args.len() != 2 { @@ -371,48 +418,12 @@ pub fn date_part(args: &[ColumnarValue]) -> Result { ColumnarValue::Scalar(scalar) => scalar.to_array(), }; - match array.data_type() { - DataType::Date32 => { - let array = array.as_any().downcast_ref::().unwrap(); - Ok(ColumnarValue::Array(Arc::new(temporal::hour(array)?))) - } - DataType::Date64 => { - let array = array.as_any().downcast_ref::().unwrap(); - Ok(ColumnarValue::Array(Arc::new(temporal::hour(array)?))) - } - DataType::Timestamp(time_unit, None) => match time_unit { - TimeUnit::Second => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - Ok(ColumnarValue::Array(Arc::new(temporal::hour(array)?))) - } - TimeUnit::Millisecond => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - Ok(ColumnarValue::Array(Arc::new(temporal::hour(array)?))) - } - TimeUnit::Microsecond => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - Ok(ColumnarValue::Array(Arc::new(temporal::hour(array)?))) - } - TimeUnit::Nanosecond => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - Ok(ColumnarValue::Array(Arc::new(temporal::hour(array)?))) - } - }, - datatype => Err(DataFusionError::Internal(format!( - "Extract does not support datatype {:?}", - datatype + match date_part.to_lowercase().as_str() { + "hour" => extract_date_part!(array, temporal::hour), + "year" => extract_date_part!(array, temporal::year), + _ => Err(DataFusionError::Execution(format!( + "Date part '{}' not supported", + date_part ))), } } diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 3a80b1df3af..f985b506536 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -726,13 +726,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { SQLExpr::Value(Value::Boolean(n)) => Ok(lit(*n)), SQLExpr::Value(Value::Null) => Ok(Expr::Literal(ScalarValue::Utf8(None))), - SQLExpr::Extract { - field: DateTimeField::Hour, - expr, - } => Ok(Expr::ScalarFunction { + SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction { fun: functions::BuiltinScalarFunction::DatePart, args: vec![ - Expr::Literal(ScalarValue::Utf8(Some("hour".to_string()))), + Expr::Literal(ScalarValue::Utf8(Some(format!("{}", field)))), self.sql_expr_to_logical_expr(expr)?, ], }), diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index a3e96dd726f..2f780b662b8 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -2138,14 +2138,16 @@ async fn crypto_expressions() -> Result<()> { async fn extract_date_part() -> Result<()> { let mut ctx = ExecutionContext::new(); let sql = "SELECT - date_part('hour', CAST('2020-01-01' AS DATE)) a, - EXTRACT(HOUR FROM CAST('2020-01-01' AS DATE)) b, - EXTRACT(HOUR FROM to_timestamp('2020-09-08T12:00:00+00:00')) c + date_part('hour', CAST('2020-01-01' AS DATE)) AS hr1, + EXTRACT(HOUR FROM CAST('2020-01-01' AS DATE)) AS hr2, + EXTRACT(HOUR FROM to_timestamp('2020-09-08T12:00:00+00:00')) AS hr3, + date_part('YEAR', CAST('2000-01-01' AS DATE)) AS year1, + EXTRACT(year FROM to_timestamp('2020-09-08T12:00:00+00:00')) AS year2 "; let actual = execute(&mut ctx, sql).await; - let expected = vec![vec!["0", "0", "12"]]; + let expected = vec![vec!["0", "0", "12", "2000", "2020"]]; assert_eq!(expected, actual); Ok(()) } From ca771de427fd207655479cd6b6f33d2dc97e68f7 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 20 Feb 2021 11:56:28 +0100 Subject: [PATCH 22/24] Fmt --- .../datafusion/src/physical_plan/functions.rs | 42 +++++++++---------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/rust/datafusion/src/physical_plan/functions.rs b/rust/datafusion/src/physical_plan/functions.rs index 54252ab2e23..8d837be8a47 100644 --- a/rust/datafusion/src/physical_plan/functions.rs +++ b/rust/datafusion/src/physical_plan/functions.rs @@ -462,28 +462,26 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature { DataType::Utf8, DataType::Timestamp(TimeUnit::Nanosecond, None), ]), - BuiltinScalarFunction::DatePart => { - Signature::OneOf(vec![ - Signature::Exact(vec![DataType::Utf8, DataType::Date32]), - Signature::Exact(vec![DataType::Utf8, DataType::Date64]), - Signature::Exact(vec![ - DataType::Utf8, - DataType::Timestamp(TimeUnit::Second, None), - ]), - Signature::Exact(vec![ - DataType::Utf8, - DataType::Timestamp(TimeUnit::Microsecond, None), - ]), - Signature::Exact(vec![ - DataType::Utf8, - DataType::Timestamp(TimeUnit::Millisecond, None), - ]), - Signature::Exact(vec![ - DataType::Utf8, - DataType::Timestamp(TimeUnit::Nanosecond, None), - ]), - ]) - } + BuiltinScalarFunction::DatePart => Signature::OneOf(vec![ + Signature::Exact(vec![DataType::Utf8, DataType::Date32]), + Signature::Exact(vec![DataType::Utf8, DataType::Date64]), + Signature::Exact(vec![ + DataType::Utf8, + DataType::Timestamp(TimeUnit::Second, None), + ]), + Signature::Exact(vec![ + DataType::Utf8, + DataType::Timestamp(TimeUnit::Microsecond, None), + ]), + Signature::Exact(vec![ + DataType::Utf8, + DataType::Timestamp(TimeUnit::Millisecond, None), + ]), + Signature::Exact(vec![ + DataType::Utf8, + DataType::Timestamp(TimeUnit::Nanosecond, None), + ]), + ]), BuiltinScalarFunction::Array => { Signature::Variadic(array_expressions::SUPPORTED_ARRAY_TYPES.to_vec()) } From a23e4c9dfdf98e643f4b1387d70dcf4a69766a31 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 20 Feb 2021 12:56:31 +0100 Subject: [PATCH 23/24] Return scalar values --- .../src/physical_plan/datetime_expressions.rs | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/rust/datafusion/src/physical_plan/datetime_expressions.rs b/rust/datafusion/src/physical_plan/datetime_expressions.rs index cf6b2ddaf96..3d363ce97d2 100644 --- a/rust/datafusion/src/physical_plan/datetime_expressions.rs +++ b/rust/datafusion/src/physical_plan/datetime_expressions.rs @@ -24,7 +24,7 @@ use crate::{ scalar::{ScalarType, ScalarValue}, }; use arrow::{ - array::{Array, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait}, + array::{Array, ArrayRef, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait}, datatypes::{ArrowPrimitiveType, DataType, TimestampNanosecondType}, }; use arrow::{ @@ -352,11 +352,11 @@ macro_rules! extract_date_part { match $ARRAY.data_type() { DataType::Date32 => { let array = $ARRAY.as_any().downcast_ref::().unwrap(); - Ok(ColumnarValue::Array(Arc::new($FN(array)?))) + Ok($FN(array)?) } DataType::Date64 => { let array = $ARRAY.as_any().downcast_ref::().unwrap(); - Ok(ColumnarValue::Array(Arc::new($FN(array)?))) + Ok($FN(array)?) } DataType::Timestamp(time_unit, None) => match time_unit { TimeUnit::Second => { @@ -364,28 +364,28 @@ macro_rules! extract_date_part { .as_any() .downcast_ref::() .unwrap(); - Ok(ColumnarValue::Array(Arc::new($FN(array)?))) + Ok($FN(array)?) } TimeUnit::Millisecond => { let array = $ARRAY .as_any() .downcast_ref::() .unwrap(); - Ok(ColumnarValue::Array(Arc::new($FN(array)?))) + Ok($FN(array)?) } TimeUnit::Microsecond => { let array = $ARRAY .as_any() .downcast_ref::() .unwrap(); - Ok(ColumnarValue::Array(Arc::new($FN(array)?))) + Ok($FN(array)?) } TimeUnit::Nanosecond => { let array = $ARRAY .as_any() .downcast_ref::() .unwrap(); - Ok(ColumnarValue::Array(Arc::new($FN(array)?))) + Ok($FN(array)?) } }, datatype => Err(DataFusionError::Internal(format!( @@ -413,19 +413,30 @@ pub fn date_part(args: &[ColumnarValue]) -> Result { )); }; + let is_scalar = matches!(array, ColumnarValue::Scalar(_)); + let array = match array { ColumnarValue::Array(array) => array.clone(), ColumnarValue::Scalar(scalar) => scalar.to_array(), }; - match date_part.to_lowercase().as_str() { + let arr = match date_part.to_lowercase().as_str() { "hour" => extract_date_part!(array, temporal::hour), "year" => extract_date_part!(array, temporal::year), _ => Err(DataFusionError::Execution(format!( "Date part '{}' not supported", date_part ))), - } + }?; + + Ok(if is_scalar { + ColumnarValue::Scalar(ScalarValue::try_from_array( + &(Arc::new(arr) as ArrayRef), + 0, + )?) + } else { + ColumnarValue::Array(Arc::new(arr)) + }) } #[cfg(test)] From aa149e7d2ea4b11106fc980da806fd82d81b3480 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 20 Feb 2021 14:19:13 +0100 Subject: [PATCH 24/24] Avoid copy --- rust/datafusion/src/physical_plan/type_coercion.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/type_coercion.rs b/rust/datafusion/src/physical_plan/type_coercion.rs index 139ce4804a2..ae920cb870f 100644 --- a/rust/datafusion/src/physical_plan/type_coercion.rs +++ b/rust/datafusion/src/physical_plan/type_coercion.rs @@ -70,7 +70,10 @@ pub fn data_types( ) -> Result> { let valid_types = get_valid_types(signature, current_types)?; - if valid_types.contains(¤t_types.to_owned()) { + if valid_types + .iter() + .any(|data_type| data_type == current_types) + { return Ok(current_types.to_vec()); }