From 6aee5ca95f8b019437eae6759ad0bb40a2d462b2 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Sat, 6 Feb 2021 16:11:21 +0300 Subject: [PATCH] ARROW-10816: [Rust][DF] Initial support for Interval expressions --- rust/arrow/src/datatypes.rs | 2 +- rust/arrow/src/util/display.rs | 70 +++++++++- rust/datafusion/src/scalar.rs | 46 ++++++- rust/datafusion/src/sql/planner.rs | 205 ++++++++++++++++++++++++++++- rust/datafusion/tests/sql.rs | 71 ++++++++++ 5 files changed, 385 insertions(+), 9 deletions(-) diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index baa6f1c37cb..2231da81c37 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -169,7 +169,7 @@ pub enum IntervalUnit { /// Indicates the number of elapsed whole months, stored as 4-byte integers. YearMonth, /// Indicates the number of elapsed days and milliseconds, - /// stored as 2 contiguous 32-bit integers (8-bytes in total). + /// stored as 2 contiguous 32-bit integers (days, milliseconds) (8-bytes in total). DayTime, } diff --git a/rust/arrow/src/util/display.rs b/rust/arrow/src/util/display.rs index bb0d460b346..eb4a5fcec4b 100644 --- a/rust/arrow/src/util/display.rs +++ b/rust/arrow/src/util/display.rs @@ -19,12 +19,12 @@ //! purposes. See the `pretty` crate for additional functions for //! record batch pretty printing. -use crate::array; use crate::array::Array; use crate::datatypes::{ ArrowNativeType, ArrowPrimitiveType, DataType, Int16Type, Int32Type, Int64Type, Int8Type, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; +use crate::{array, datatypes::IntervalUnit}; use array::DictionaryArray; @@ -44,6 +44,66 @@ macro_rules! make_string { }}; } +macro_rules! make_string_interval_year_month { + ($column: ident, $row: ident) => {{ + let array = $column + .as_any() + .downcast_ref::() + .unwrap(); + + let s = if array.is_null($row) { + "NULL".to_string() + } else { + let interval = array.value($row) as f64; + let years = (interval / 12_f64).floor(); + let month = interval - (years * 12_f64); + + format!( + "{} years {} mons 0 days 0 hours 0 mins 0.00 secs", + years, month, + ) + }; + + Ok(s) + }}; +} + +macro_rules! make_string_interval_day_time { + ($column: ident, $row: ident) => {{ + let array = $column + .as_any() + .downcast_ref::() + .unwrap(); + + let s = if array.is_null($row) { + "NULL".to_string() + } else { + let value: u64 = array.value($row) as u64; + + let days_parts: i32 = ((value & 0xFFFFFFFF00000000) >> 32) as i32; + let milliseconds_part: i32 = (value & 0xFFFFFFFF) as i32; + + let secs = milliseconds_part / 1000; + let mins = secs / 60; + let hours = mins / 60; + + let secs = secs - (mins * 60); + let mins = mins - (hours * 60); + + format!( + "0 years 0 mons {} days {} hours {} mins {}.{:02} secs", + days_parts, + hours, + mins, + secs, + (milliseconds_part % 1000), + ) + }; + + Ok(s) + }}; +} + macro_rules! make_string_date { ($array_type:ty, $column: ident, $row: ident) => {{ let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); @@ -180,6 +240,14 @@ pub fn array_value_to_string(column: &array::ArrayRef, row: usize) -> Result { make_string_time!(array::Time64NanosecondArray, column, row) } + DataType::Interval(unit) => match unit { + IntervalUnit::DayTime => { + make_string_interval_day_time!(column, row) + } + IntervalUnit::YearMonth => { + make_string_interval_year_month!(column, row) + } + }, DataType::List(_) => make_string_from_list!(column, row), DataType::Dictionary(index_type, _value_type) => match **index_type { DataType::Int8 => dict_array_value_to_string::(column, row), diff --git a/rust/datafusion/src/scalar.rs b/rust/datafusion/src/scalar.rs index b86f4e5c086..35b15907fde 100644 --- a/rust/datafusion/src/scalar.rs +++ b/rust/datafusion/src/scalar.rs @@ -19,11 +19,6 @@ use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc}; -use arrow::array::{ - Array, BooleanArray, Date32Array, Float32Array, Float64Array, Int16Array, Int32Array, - Int64Array, Int8Array, LargeStringArray, ListArray, StringArray, UInt16Array, - UInt32Array, UInt64Array, UInt8Array, -}; use arrow::array::{ Int16Builder, Int32Builder, Int64Builder, Int8Builder, ListBuilder, TimestampMicrosecondArray, TimestampNanosecondArray, UInt16Builder, UInt32Builder, @@ -33,6 +28,15 @@ use arrow::{ array::ArrayRef, datatypes::{DataType, Field}, }; +use arrow::{ + array::{ + Array, BooleanArray, Date32Array, Float32Array, Float64Array, Int16Array, + Int32Array, Int64Array, Int8Array, IntervalDayTimeArray, IntervalYearMonthArray, + LargeStringArray, ListArray, StringArray, UInt16Array, UInt32Array, UInt64Array, + UInt8Array, + }, + datatypes::IntervalUnit, +}; use crate::error::{DataFusionError, Result}; use arrow::datatypes::TimeUnit; @@ -75,6 +79,10 @@ pub enum ScalarValue { TimeMicrosecond(Option), /// Timestamp Nanoseconds TimeNanosecond(Option), + /// Interval with YearMonth unit + IntervalYearMonth(Option), + /// Interval with DayTime unit + IntervalDayTime(Option), } macro_rules! typed_cast { @@ -148,6 +156,10 @@ impl ScalarValue { DataType::List(Box::new(Field::new("item", data_type.clone(), true))) } ScalarValue::Date32(_) => DataType::Date32, + ScalarValue::IntervalYearMonth(_) => { + DataType::Interval(IntervalUnit::YearMonth) + } + ScalarValue::IntervalDayTime(_) => DataType::Interval(IntervalUnit::DayTime), } } @@ -317,6 +329,22 @@ impl ScalarValue { } None => Arc::new(repeat(None).take(size).collect::()), }, + ScalarValue::IntervalDayTime(e) => match e { + Some(value) => Arc::new(IntervalDayTimeArray::from_iter_values( + repeat(*value).take(size), + )), + None => { + Arc::new(repeat(None).take(size).collect::()) + } + }, + ScalarValue::IntervalYearMonth(e) => match e { + Some(value) => Arc::new(IntervalYearMonthArray::from_iter_values( + repeat(*value).take(size), + )), + None => { + Arc::new(repeat(None).take(size).collect::()) + } + }, } } @@ -552,6 +580,8 @@ impl fmt::Display for ScalarValue { None => write!(f, "NULL")?, }, ScalarValue::Date32(e) => format_option!(f, e)?, + ScalarValue::IntervalDayTime(e) => format_option!(f, e)?, + ScalarValue::IntervalYearMonth(e) => format_option!(f, e)?, }; Ok(()) } @@ -579,6 +609,12 @@ impl fmt::Debug for ScalarValue { ScalarValue::LargeUtf8(Some(_)) => write!(f, "LargeUtf8(\"{}\")", self), ScalarValue::List(_, _) => write!(f, "List([{}])", self), ScalarValue::Date32(_) => write!(f, "Date32(\"{}\")", self), + ScalarValue::IntervalDayTime(_) => { + write!(f, "IntervalDayTime(\"{}\")", self) + } + ScalarValue::IntervalYearMonth(_) => { + write!(f, "IntervalYearMonth(\"{}\")", self) + } } } } diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 2b6aec302e3..a90def60b62 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -41,8 +41,8 @@ use arrow::datatypes::*; use crate::prelude::JoinType; use sqlparser::ast::{ - BinaryOperator, DataType as SQLDataType, Expr as SQLExpr, FunctionArg, Join, - JoinConstraint, JoinOperator, Query, Select, SelectItem, SetExpr, TableFactor, + BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg, + Join, JoinConstraint, JoinOperator, Query, Select, SelectItem, SetExpr, TableFactor, TableWithJoins, UnaryOperator, Value, }; use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption}; @@ -724,6 +724,20 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { SQLExpr::Value(Value::Null) => Ok(Expr::Literal(ScalarValue::Utf8(None))), + SQLExpr::Value(Value::Interval { + value, + leading_field, + leading_precision, + last_field, + fractional_seconds_precision, + }) => self.sql_interval_to_literal( + value, + leading_field, + leading_precision, + last_field, + fractional_seconds_precision, + ), + SQLExpr::Identifier(ref id) => { if &id.value[0..1] == "@" { let var_names = vec![id.value.clone()]; @@ -982,6 +996,173 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ))), } } + + fn sql_interval_to_literal( + &self, + value: &str, + leading_field: &Option, + leading_precision: &Option, + last_field: &Option, + fractional_seconds_precision: &Option, + ) -> Result { + if leading_field.is_some() { + return Err(DataFusionError::NotImplemented(format!( + "Unsupported Interval Expression with leading_field {:?}", + leading_field + ))); + } + + if leading_precision.is_some() { + return Err(DataFusionError::NotImplemented(format!( + "Unsupported Interval Expression with leading_precision {:?}", + leading_precision + ))); + } + + if last_field.is_some() { + return Err(DataFusionError::NotImplemented(format!( + "Unsupported Interval Expression with last_field {:?}", + last_field + ))); + } + + if fractional_seconds_precision.is_some() { + return Err(DataFusionError::NotImplemented(format!( + "Unsupported Interval Expression with fractional_seconds_precision {:?}", + fractional_seconds_precision + ))); + } + + const SECONDS_PER_HOUR: f32 = 3_600_f32; + const MILLIS_PER_SECOND: f32 = 1_000_f32; + + // We are storing parts as integers, it's why we need to align parts fractional + // INTERVAL '0.5 MONTH' = 15 days, INTERVAL '1.5 MONTH' = 1 month 15 days + // INTERVAL '0.5 DAY' = 12 hours, INTERVAL '1.5 DAY' = 1 day 12 hours + let align_interval_parts = |month_part: f32, + mut day_part: f32, + mut milles_part: f32| + -> (i32, i32, f32) { + // Convert fractional month to days, It's not supported by Arrow types, but anyway + day_part += (month_part - (month_part as i32) as f32) * 30_f32; + + // Convert fractional days to hours + milles_part += (day_part - ((day_part as i32) as f32)) + * 24_f32 + * SECONDS_PER_HOUR + * MILLIS_PER_SECOND; + + (month_part as i32, day_part as i32, milles_part) + }; + + let calculate_from_part = |interval_period_str: &str, + interval_type: &str| + -> Result<(i32, i32, f32)> { + // @todo It's better to use Decimal in order to protect rounding errors + // Wait https://github.com/apache/arrow/pull/9232 + let interval_period = match f32::from_str(interval_period_str) { + Ok(n) => n, + Err(_) => { + return Err(DataFusionError::SQL(ParserError(format!( + "Unsupported Interval Expression with value {:?}", + value + )))) + } + }; + + if interval_period > (i32::MAX as f32) { + return Err(DataFusionError::NotImplemented(format!( + "Interval field value out of range: {:?}", + value + ))); + } + + match interval_type.to_lowercase().as_str() { + "year" => Ok(align_interval_parts(interval_period * 12_f32, 0.0, 0.0)), + "month" => Ok(align_interval_parts(interval_period, 0.0, 0.0)), + "day" | "days" => Ok(align_interval_parts(0.0, interval_period, 0.0)), + "hour" | "hours" => { + Ok((0, 0, interval_period * SECONDS_PER_HOUR * MILLIS_PER_SECOND)) + } + "minutes" | "minute" => { + Ok((0, 0, interval_period * 60_f32 * MILLIS_PER_SECOND)) + } + "seconds" | "second" => Ok((0, 0, interval_period * MILLIS_PER_SECOND)), + "milliseconds" | "millisecond" => Ok((0, 0, interval_period)), + _ => Err(DataFusionError::NotImplemented(format!( + "Invalid input syntax for type interval: {:?}", + value + ))), + } + }; + + let mut result_month: i64 = 0; + let mut result_days: i64 = 0; + let mut result_millis: i64 = 0; + + let mut parts = value.split_whitespace(); + + loop { + let interval_period_str = parts.next(); + if interval_period_str.is_none() { + break; + } + + let (diff_month, diff_days, diff_millis) = calculate_from_part( + interval_period_str.unwrap(), + parts.next().unwrap_or("second"), + )?; + + result_month += diff_month as i64; + + if result_month > (i32::MAX as i64) { + return Err(DataFusionError::NotImplemented(format!( + "Interval field value out of range: {:?}", + value + ))); + } + + result_days += diff_days as i64; + + if result_days > (i32::MAX as i64) { + return Err(DataFusionError::NotImplemented(format!( + "Interval field value out of range: {:?}", + value + ))); + } + + result_millis += diff_millis as i64; + + if result_millis > (i32::MAX as i64) { + return Err(DataFusionError::NotImplemented(format!( + "Interval field value out of range: {:?}", + value + ))); + } + } + + // Interval is tricky thing + // 1 day is not 24 hours because timezones, 1 year != 365/364! 30 days != 1 month + // The true way to store and calculate intervals is to store it as it defined + // Due the fact that Arrow supports only two types YearMonth (month) and DayTime (day, time) + // It's not possible to store complex intervals + // It's possible to do select (NOW() + INTERVAL '1 year') + INTERVAL '1 day'; as workaround + if result_month != 0 && (result_days != 0 || result_millis != 0) { + return Err(DataFusionError::NotImplemented(format!( + "DF does not support intervals that have both a Year/Month part as well as Days/Hours/Mins/Seconds: {:?}. Hint: try breaking the interval into two parts, one with Year/Month and the other with Days/Hours/Mins/Seconds - e.g. (NOW() + INTERVAL '1 year') + INTERVAL '1 day'", + value + ))); + } + + if result_month != 0 { + return Ok(Expr::Literal(ScalarValue::IntervalYearMonth(Some( + result_month as i32, + )))); + } + + let result: i64 = (result_days << 32) | result_millis; + Ok(Expr::Literal(ScalarValue::IntervalDayTime(Some(result)))) + } } /// Remove join expressions from a filter expression @@ -1768,6 +1949,26 @@ mod tests { ); } + #[test] + fn select_interval_out_of_range() { + let sql = "SELECT INTERVAL '100000000000000000 day'"; + let err = logical_plan(sql).expect_err("query should have failed"); + assert_eq!( + "NotImplemented(\"Interval field value out of range: \\\"100000000000000000 day\\\"\")", + format!("{:?}", err) + ); + } + + #[test] + fn select_unsupported_complex_interval() { + let sql = "SELECT INTERVAL '1 year 1 day'"; + let err = logical_plan(sql).expect_err("query should have failed"); + assert_eq!( + "NotImplemented(\"DF does not support intervals that have both a Year/Month part as well as Days/Hours/Mins/Seconds: \\\"1 year 1 day\\\". Hint: try breaking the interval into two parts, one with Year/Month and the other with Days/Hours/Mins/Seconds - e.g. (NOW() + INTERVAL \\\'1 year\\\') + INTERVAL \\\'1 day\\\'\")", + format!("{:?}", err) + ); + } + #[test] fn select_simple_aggregate_with_groupby_and_column_is_in_aggregate_and_groupby() { quick_test( diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 0c2ff6c863f..653d1baa978 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -2027,6 +2027,77 @@ async fn boolean_expressions() -> Result<()> { Ok(()) } +#[tokio::test] +async fn interval_expressions() -> Result<()> { + let mut ctx = ExecutionContext::new(); + let sql = "SELECT + (interval '1') as interval_1, + (interval '1 second') as interval_2, + (interval '500 milliseconds') as interval_3, + (interval '5 second') as interval_4, + (interval '1 minute') as interval_5, + (interval '0.5 minute') as interval_6, + (interval '.5 minute') as interval_7, + (interval '5 minute') as interval_8, + (interval '5 minute 1 second') as interval_9, + (interval '1 hour') as interval_10, + (interval '5 hour') as interval_11, + (interval '1 day') as interval_12, + (interval '1 day 1') as interval_13, + (interval '0.5') as interval_14, + (interval '0.5 day 1') as interval_15, + (interval '0.49 day') as interval_16, + (interval '0.499 day') as interval_17, + (interval '0.4999 day') as interval_18, + (interval '0.49999 day') as interval_19, + (interval '0.49999999999 day') as interval_20, + (interval '5 day') as interval_21, + (interval '5 day 4 hours 3 minutes 2 seconds 100 milliseconds') as interval_22, + (interval '0.5 month') as interval_23, + (interval '1 month') as interval_24, + (interval '5 month') as interval_25, + (interval '13 month') as interval_26, + (interval '0.5 year') as interval_27, + (interval '1 year') as interval_28, + (interval '2 year') as interval_29 + "; + let actual = execute(&mut ctx, sql).await; + + let expected = vec![vec![ + "0 years 0 mons 0 days 0 hours 0 mins 1.00 secs", + "0 years 0 mons 0 days 0 hours 0 mins 1.00 secs", + "0 years 0 mons 0 days 0 hours 0 mins 0.500 secs", + "0 years 0 mons 0 days 0 hours 0 mins 5.00 secs", + "0 years 0 mons 0 days 0 hours 1 mins 0.00 secs", + "0 years 0 mons 0 days 0 hours 0 mins 30.00 secs", + "0 years 0 mons 0 days 0 hours 0 mins 30.00 secs", + "0 years 0 mons 0 days 0 hours 5 mins 0.00 secs", + "0 years 0 mons 0 days 0 hours 5 mins 1.00 secs", + "0 years 0 mons 0 days 1 hours 0 mins 0.00 secs", + "0 years 0 mons 0 days 5 hours 0 mins 0.00 secs", + "0 years 0 mons 1 days 0 hours 0 mins 0.00 secs", + "0 years 0 mons 1 days 0 hours 0 mins 1.00 secs", + "0 years 0 mons 0 days 0 hours 0 mins 0.500 secs", + "0 years 0 mons 0 days 12 hours 0 mins 1.00 secs", + "0 years 0 mons 0 days 11 hours 45 mins 36.00 secs", + "0 years 0 mons 0 days 11 hours 58 mins 33.596 secs", + "0 years 0 mons 0 days 11 hours 59 mins 51.364 secs", + "0 years 0 mons 0 days 11 hours 59 mins 59.136 secs", + "0 years 0 mons 0 days 12 hours 0 mins 0.00 secs", + "0 years 0 mons 5 days 0 hours 0 mins 0.00 secs", + "0 years 0 mons 5 days 4 hours 3 mins 2.100 secs", + "0 years 0 mons 15 days 0 hours 0 mins 0.00 secs", + "0 years 1 mons 0 days 0 hours 0 mins 0.00 secs", + "0 years 5 mons 0 days 0 hours 0 mins 0.00 secs", + "1 years 1 mons 0 days 0 hours 0 mins 0.00 secs", + "0 years 6 mons 0 days 0 hours 0 mins 0.00 secs", + "1 years 0 mons 0 days 0 hours 0 mins 0.00 secs", + "2 years 0 mons 0 days 0 hours 0 mins 0.00 secs", + ]]; + assert_eq!(expected, actual); + Ok(()) +} + #[tokio::test] async fn crypto_expressions() -> Result<()> { let mut ctx = ExecutionContext::new();