diff --git a/datafusion/core/tests/sql/timestamp.rs b/datafusion/core/tests/sql/timestamp.rs index 257fa4658af4c..ba916a6586eba 100644 --- a/datafusion/core/tests/sql/timestamp.rs +++ b/datafusion/core/tests/sql/timestamp.rs @@ -17,6 +17,7 @@ use super::*; use datafusion::from_slice::FromSlice; +use std::ops::Add; #[tokio::test] async fn query_cast_timestamp_millis() -> Result<()> { @@ -1256,3 +1257,141 @@ async fn date_bin() { "Arrow error: External error: This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays" ); } + +#[tokio::test] +async fn timestamp_add_interval_second() -> Result<()> { + let ctx = SessionContext::new(); + + let sql = "SELECT NOW(), NOW() + INTERVAL '1' SECOND;"; + let results = execute_to_batches(&ctx, sql).await; + let actual = result_vec(&results); + + let res1 = actual[0][0].as_str(); + let res2 = actual[0][1].as_str(); + + let format = "%Y-%m-%d %H:%M:%S%.6f"; + let t1_naive = chrono::NaiveDateTime::parse_from_str(res1, format).unwrap(); + let t2_naive = chrono::NaiveDateTime::parse_from_str(res2, format).unwrap(); + + assert_eq!(t1_naive.add(Duration::seconds(1)), t2_naive); + Ok(()) +} + +#[tokio::test] +async fn timestamp_sub_interval_days() -> Result<()> { + let ctx = SessionContext::new(); + + let sql = "SELECT NOW(), NOW() - INTERVAL '8' DAY;"; + let results = execute_to_batches(&ctx, sql).await; + let actual = result_vec(&results); + + let res1 = actual[0][0].as_str(); + let res2 = actual[0][1].as_str(); + + let format = "%Y-%m-%d %H:%M:%S%.6f"; + let t1_naive = chrono::NaiveDateTime::parse_from_str(res1, format).unwrap(); + let t2_naive = chrono::NaiveDateTime::parse_from_str(res2, format).unwrap(); + + assert_eq!(t1_naive.sub(Duration::days(8)), t2_naive); + Ok(()) +} + +#[tokio::test] +async fn timestamp_add_interval_months() -> Result<()> { + let ctx = SessionContext::new(); + + let sql = "SELECT NOW(), NOW() + INTERVAL '4' MONTH;"; + let results = execute_to_batches(&ctx, sql).await; + let actual = result_vec(&results); + + let res1 = actual[0][0].as_str(); + let res2 = actual[0][1].as_str(); + + let format = "%Y-%m-%d %H:%M:%S%.6f"; + let t1_naive = chrono::NaiveDateTime::parse_from_str(res1, format).unwrap(); + let t2_naive = chrono::NaiveDateTime::parse_from_str(res2, format).unwrap(); + + assert_eq!(t1_naive.with_month(t1_naive.month() + 4).unwrap(), t2_naive); + Ok(()) +} + +#[tokio::test] +async fn timestamp_sub_interval_years() -> Result<()> { + let ctx = SessionContext::new(); + + let sql = "SELECT NOW(), NOW() - INTERVAL '16' YEAR;"; + let results = execute_to_batches(&ctx, sql).await; + let actual = result_vec(&results); + + let res1 = actual[0][0].as_str(); + let res2 = actual[0][1].as_str(); + + let format = "%Y-%m-%d %H:%M:%S%.6f"; + let t1_naive = chrono::NaiveDateTime::parse_from_str(res1, format).unwrap(); + let t2_naive = chrono::NaiveDateTime::parse_from_str(res2, format).unwrap(); + + assert_eq!(t1_naive.with_year(t1_naive.year() - 16).unwrap(), t2_naive); + Ok(()) +} + +#[tokio::test] +async fn timestamp_array_add_interval() -> Result<()> { + let ctx = SessionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT ts, ts - INTERVAL '8' MILLISECONDS FROM table_a"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+----------------------------+---------------------------------------+", + "| ts | table_a.ts Minus IntervalDayTime(\"8\") |", + "+----------------------------+---------------------------------------+", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.182855 |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.182855 |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.182855 |", + "+----------------------------+---------------------------------------+", + ]; + assert_batches_eq!(expected, &actual); + + let sql = "SELECT ts, ts + INTERVAL '1' SECOND FROM table_b"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+----------------------------+-----------------------------------------+", + "| ts | table_b.ts Plus IntervalDayTime(\"1000\") |", + "+----------------------------+-----------------------------------------+", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:30.190855 |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:30.190855 |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:30.190855 |", + "+----------------------------+-----------------------------------------+", + ]; + assert_batches_eq!(expected, &actual); + + let sql = "SELECT ts, ts + INTERVAL '2' MONTH FROM table_b"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+----------------------------+----------------------------------------+", + "| ts | table_b.ts Plus IntervalYearMonth(\"2\") |", + "+----------------------------+----------------------------------------+", + "| 2020-09-08 13:42:29.190855 | 2020-11-08 13:42:29.190855 |", + "| 2020-09-08 12:42:29.190855 | 2020-11-08 12:42:29.190855 |", + "| 2020-09-08 11:42:29.190855 | 2020-11-08 11:42:29.190855 |", + "+----------------------------+----------------------------------------+", + ]; + assert_batches_eq!(expected, &actual); + + let sql = "SELECT ts, ts - INTERVAL '16' YEAR FROM table_b"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+----------------------------+-------------------------------------------+", + "| ts | table_b.ts Minus IntervalYearMonth(\"192\") |", + "+----------------------------+-------------------------------------------+", + "| 2020-09-08 13:42:29.190855 | 2004-09-08 13:42:29.190855 |", + "| 2020-09-08 12:42:29.190855 | 2004-09-08 12:42:29.190855 |", + "| 2020-09-08 11:42:29.190855 | 2004-09-08 11:42:29.190855 |", + "+----------------------------+-------------------------------------------+", + ]; + assert_batches_eq!(expected, &actual); + Ok(()) +} diff --git a/datafusion/expr/src/binary_rule.rs b/datafusion/expr/src/binary_rule.rs index d6994d68847a3..a506412d02a8d 100644 --- a/datafusion/expr/src/binary_rule.rs +++ b/datafusion/expr/src/binary_rule.rs @@ -95,7 +95,9 @@ pub fn coerce_types( Operator::Like | Operator::NotLike => like_coercion(lhs_type, rhs_type), // date +/- interval returns date Operator::Plus | Operator::Minus - if (*lhs_type == DataType::Date32 || *lhs_type == DataType::Date64) => + if (*lhs_type == DataType::Date32 + || *lhs_type == DataType::Date64 + || matches!(lhs_type, DataType::Timestamp(_, _))) => { match rhs_type { DataType::Interval(_) => Some(lhs_type.clone()), diff --git a/datafusion/physical-expr/src/expressions/datetime.rs b/datafusion/physical-expr/src/expressions/datetime.rs index 3c59a90b0ea1e..bb76862f4ebfe 100644 --- a/datafusion/physical-expr/src/expressions/datetime.rs +++ b/datafusion/physical-expr/src/expressions/datetime.rs @@ -17,9 +17,17 @@ use crate::expressions::delta::shift_months; use crate::PhysicalExpr; -use arrow::datatypes::{DataType, Schema}; +use arrow::array::{ + Array, ArrayRef, Date32Array, Date64Array, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, +}; +use arrow::compute::unary; +use arrow::datatypes::{ + DataType, Date32Type, Date64Type, Schema, TimeUnit, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, +}; use arrow::record_batch::RecordBatch; -use chrono::{Duration, NaiveDate}; +use chrono::{Datelike, Duration, NaiveDate, NaiveDateTime}; use datafusion_common::Result; use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_expr::{ColumnarValue, Operator}; @@ -45,19 +53,21 @@ impl DateIntervalExpr { input_schema: &Schema, ) -> Result { match lhs.data_type(input_schema)? { - DataType::Date32 | DataType::Date64 => match rhs.data_type(input_schema)? { - DataType::Interval(_) => match &op { - Operator::Plus | Operator::Minus => Ok(Self { lhs, op, rhs }), - _ => Err(DataFusionError::Execution(format!( - "Invalid operator '{}' for DateIntervalExpr", - op + DataType::Date32 | DataType::Date64 | DataType::Timestamp(_, _) => { + match rhs.data_type(input_schema)? { + DataType::Interval(_) => match &op { + Operator::Plus | Operator::Minus => Ok(Self { lhs, op, rhs }), + _ => Err(DataFusionError::Execution(format!( + "Invalid operator '{}' for DateIntervalExpr", + op + ))), + }, + other => Err(DataFusionError::Execution(format!( + "Operation '{}' not support for type {}", + op, other ))), - }, - other => Err(DataFusionError::Execution(format!( - "Invalid rhs type '{}' for DateIntervalExpr", - other - ))), - }, + } + } other => Err(DataFusionError::Execution(format!( "Invalid lhs type '{}' for DateIntervalExpr", other @@ -89,28 +99,8 @@ impl PhysicalExpr for DateIntervalExpr { let dates = self.lhs.evaluate(batch)?; let intervals = self.rhs.evaluate(batch)?; - // Unwrap days since epoch - let operand = match dates { - ColumnarValue::Scalar(scalar) => scalar, - _ => Err(DataFusionError::Execution( - "Columnar execution is not yet supported for DateIntervalExpr" - .to_string(), - ))?, - }; - - // Convert to NaiveDate - let epoch = NaiveDate::from_ymd(1970, 1, 1); - let prior = match operand { - ScalarValue::Date32(Some(d)) => epoch.add(Duration::days(d as i64)), - ScalarValue::Date64(Some(ms)) => epoch.add(Duration::milliseconds(ms)), - _ => Err(DataFusionError::Execution(format!( - "Invalid lhs type for DateIntervalExpr: {:?}", - operand - )))?, - }; - // Unwrap interval to add - let scalar = match &intervals { + let intervals = match &intervals { ColumnarValue::Scalar(interval) => interval, _ => Err(DataFusionError::Execution( "Columnar execution is not yet supported for DateIntervalExpr" @@ -130,38 +120,196 @@ impl PhysicalExpr for DateIntervalExpr { } }; - // Do math - let posterior = match scalar { - ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign), - ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i * sign), - ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, sign), - other => Err(DataFusionError::Execution(format!( - "DateIntervalExpr does not support non-interval type {:?}", - other - )))?, - }; - - // convert back - let res = match operand { - ScalarValue::Date32(Some(_)) => { - let days = posterior.sub(epoch).num_days() as i32; - ColumnarValue::Scalar(ScalarValue::Date32(Some(days))) - } - ScalarValue::Date64(Some(_)) => { - let ms = posterior.sub(epoch).num_milliseconds(); - ColumnarValue::Scalar(ScalarValue::Date64(Some(ms))) - } - _ => Err(DataFusionError::Execution(format!( - "Invalid lhs type for DateIntervalExpr: {}", - scalar - )))?, - }; - Ok(res) + match dates { + ColumnarValue::Scalar(operand) => evaluate_scalar(operand, sign, intervals), + ColumnarValue::Array(array) => evaluate_array(array, sign, intervals), + } } } +pub fn evaluate_array( + array: ArrayRef, + sign: i32, + scalar: &ScalarValue, +) -> Result { + let ret = match array.data_type() { + DataType::Date32 => { + let array = array.as_any().downcast_ref::().unwrap(); + Arc::new(unary::(array, |days| { + date32_add(days, scalar, sign).unwrap() + })) as ArrayRef + } + DataType::Date64 => { + let array = array.as_any().downcast_ref::().unwrap(); + Arc::new(unary::(array, |ms| { + date64_add(ms, scalar, sign).unwrap() + })) as ArrayRef + } + DataType::Timestamp(TimeUnit::Second, _) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + Arc::new(unary::( + array, + |ts_s| seconds_add(ts_s, scalar, sign).unwrap(), + )) as ArrayRef + } + DataType::Timestamp(TimeUnit::Millisecond, _) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + Arc::new( + unary::( + array, + |ts_ms| milliseconds_add(ts_ms, scalar, sign).unwrap(), + ), + ) as ArrayRef + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + Arc::new( + unary::( + array, + |ts_us| microseconds_add(ts_us, scalar, sign).unwrap(), + ), + ) as ArrayRef + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + Arc::new( + unary::( + array, + |ts_ns| nanoseconds_add(ts_ns, scalar, sign).unwrap(), + ), + ) as ArrayRef + } + _ => Err(DataFusionError::Execution(format!( + "Invalid lhs type for DateIntervalExpr: {}", + array.data_type() + )))?, + }; + Ok(ColumnarValue::Array(ret)) +} + +fn evaluate_scalar( + operand: ScalarValue, + sign: i32, + scalar: &ScalarValue, +) -> Result { + let res = match operand { + ScalarValue::Date32(Some(days)) => { + let value = date32_add(days, scalar, sign)?; + ColumnarValue::Scalar(ScalarValue::Date32(Some(value))) + } + ScalarValue::Date64(Some(ms)) => { + let value = date64_add(ms, scalar, sign)?; + ColumnarValue::Scalar(ScalarValue::Date64(Some(value))) + } + ScalarValue::TimestampSecond(Some(ts_s), zone) => { + let value = seconds_add(ts_s, scalar, sign)?; + ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(value), zone)) + } + ScalarValue::TimestampMillisecond(Some(ts_ms), zone) => { + let value = milliseconds_add(ts_ms, scalar, sign)?; + ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(value), zone)) + } + ScalarValue::TimestampMicrosecond(Some(ts_us), zone) => { + let value = microseconds_add(ts_us, scalar, sign)?; + ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(value), zone)) + } + ScalarValue::TimestampNanosecond(Some(ts_ns), zone) => { + let value = nanoseconds_add(ts_ns, scalar, sign)?; + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(value), zone)) + } + _ => Err(DataFusionError::Execution(format!( + "Invalid lhs type {} for DateIntervalExpr", + operand.get_datatype() + )))?, + }; + Ok(res) +} + +#[inline] +fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result { + let epoch = NaiveDate::from_ymd(1970, 1, 1); + let prior = epoch.add(Duration::days(days as i64)); + let posterior = do_date_math(prior, scalar, sign)?; + Ok(posterior.sub(epoch).num_days() as i32) +} + +#[inline] +fn date64_add(ms: i64, scalar: &ScalarValue, sign: i32) -> Result { + let epoch = NaiveDate::from_ymd(1970, 1, 1); + let prior = epoch.add(Duration::milliseconds(ms)); + let posterior = do_date_math(prior, scalar, sign)?; + Ok(posterior.sub(epoch).num_milliseconds()) +} + +#[inline] +fn seconds_add(ts_s: i64, scalar: &ScalarValue, sign: i32) -> Result { + Ok(do_data_time_math(ts_s, 0, scalar, sign)?.timestamp()) +} + +#[inline] +fn milliseconds_add(ts_ms: i64, scalar: &ScalarValue, sign: i32) -> Result { + let secs = ts_ms / 1000; + let nsecs = ((ts_ms % 1000) * 1_000_000) as u32; + Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_millis()) +} + +#[inline] +fn microseconds_add(ts_us: i64, scalar: &ScalarValue, sign: i32) -> Result { + let secs = ts_us / 1_000_000; + let nsecs = ((ts_us % 1_000_000) * 1000) as u32; + Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos() / 1000) +} + +#[inline] +fn nanoseconds_add(ts_ns: i64, scalar: &ScalarValue, sign: i32) -> Result { + let secs = ts_ns / 1_000_000_000; + let nsecs = (ts_ns % 1_000_000_000) as u32; + Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos()) +} + +#[inline] +fn do_data_time_math( + secs: i64, + nsecs: u32, + scalar: &ScalarValue, + sign: i32, +) -> Result { + let prior = NaiveDateTime::from_timestamp(secs, nsecs); + do_date_math(prior, scalar, sign) +} + +fn do_date_math(prior: D, scalar: &ScalarValue, sign: i32) -> Result +where + D: Datelike + Add, +{ + Ok(match scalar { + ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign), + ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i * sign), + ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, sign), + other => Err(DataFusionError::Execution(format!( + "DateIntervalExpr does not support non-interval type {:?}", + other + )))?, + }) +} + // Can remove once https://github.com/apache/arrow-rs/pull/2031 is released -fn add_m_d_nano(prior: NaiveDate, interval: i128, sign: i32) -> NaiveDate { +fn add_m_d_nano(prior: D, interval: i128, sign: i32) -> D +where + D: Datelike + Add, +{ let interval = interval as u128; let nanos = (interval >> 64) as i64 * sign as i64; let days = (interval >> 32) as i32 * sign; @@ -172,7 +320,10 @@ fn add_m_d_nano(prior: NaiveDate, interval: i128, sign: i32) -> NaiveDate { } // Can remove once https://github.com/apache/arrow-rs/pull/2031 is released -fn add_day_time(prior: NaiveDate, interval: i64, sign: i32) -> NaiveDate { +fn add_day_time(prior: D, interval: i64, sign: i32) -> D +where + D: Datelike + Add, +{ let interval = interval as u64; let days = (interval >> 32) as i32 * sign; let ms = interval as i32 * sign; @@ -187,7 +338,7 @@ mod tests { use crate::execution_props::ExecutionProps; use arrow::array::{ArrayRef, Date32Builder}; use arrow::datatypes::*; - use datafusion_common::{Result, ToDFSchema}; + use datafusion_common::{Column, Result, ToDFSchema}; use datafusion_expr::Expr; #[test] @@ -362,6 +513,165 @@ mod tests { Ok(()) } + #[test] + fn add_1_millisecond() -> Result<()> { + // setup + let now_ts_ns = chrono::Utc::now().timestamp_nanos(); + let dt = Expr::Literal(ScalarValue::TimestampNanosecond(Some(now_ts_ns), None)); + let op = Operator::Plus; + let interval = create_day_time(0, 1); + let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval))); + + // exercise + let res = exercise(&dt, op, &interval)?; + + // assert + match res { + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(ts), None)) => { + assert_eq!(ts, now_ts_ns + 1_000_000); + } + _ => Err(DataFusionError::NotImplemented( + "Unexpected result!".to_string(), + ))?, + } + Ok(()) + } + + #[test] + fn add_2_hours() -> Result<()> { + // setup + let now_ts_s = chrono::Utc::now().timestamp(); + let dt = Expr::Literal(ScalarValue::TimestampSecond(Some(now_ts_s), None)); + let op = Operator::Plus; + let interval = create_day_time(0, 2 * 3600 * 1_000); + let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval))); + + // exercise + let res = exercise(&dt, op, &interval)?; + + // assert + match res { + ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(ts), None)) => { + assert_eq!(ts, now_ts_s + 2 * 3600); + } + _ => Err(DataFusionError::NotImplemented( + "Unexpected result!".to_string(), + ))?, + } + Ok(()) + } + + #[test] + fn sub_4_hours() -> Result<()> { + // setup + let now_ts_s = chrono::Utc::now().timestamp(); + let dt = Expr::Literal(ScalarValue::TimestampSecond(Some(now_ts_s), None)); + let op = Operator::Minus; + let interval = create_day_time(0, 4 * 3600 * 1_000); + let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval))); + + // exercise + let res = exercise(&dt, op, &interval)?; + + // assert + match res { + ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(ts), None)) => { + assert_eq!(ts, now_ts_s - 4 * 3600); + } + _ => Err(DataFusionError::NotImplemented( + "Unexpected result!".to_string(), + ))?, + } + Ok(()) + } + + #[test] + fn add_8_days() -> Result<()> { + // setup + let now_ts_ns = chrono::Utc::now().timestamp_nanos(); + let dt = Expr::Literal(ScalarValue::TimestampNanosecond(Some(now_ts_ns), None)); + let op = Operator::Plus; + let interval = create_day_time(8, 0); + let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval))); + + // exercise + let res = exercise(&dt, op, &interval)?; + + // assert + match res { + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(ts), None)) => { + assert_eq!(ts, now_ts_ns + 8 * 86400 * 1_000_000_000); + } + _ => Err(DataFusionError::NotImplemented( + "Unexpected result!".to_string(), + ))?, + } + Ok(()) + } + + #[test] + fn sub_16_days() -> Result<()> { + // setup + let now_ts_ns = chrono::Utc::now().timestamp_nanos(); + let dt = Expr::Literal(ScalarValue::TimestampNanosecond(Some(now_ts_ns), None)); + let op = Operator::Minus; + let interval = create_day_time(16, 0); + let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval))); + + // exercise + let res = exercise(&dt, op, &interval)?; + + // assert + match res { + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(ts), None)) => { + assert_eq!(ts, now_ts_ns - 16 * 86400 * 1_000_000_000); + } + _ => Err(DataFusionError::NotImplemented( + "Unexpected result!".to_string(), + ))?, + } + Ok(()) + } + + #[test] + fn array_add_26_days() -> Result<()> { + let mut builder = Date32Builder::new(8); + builder.append_slice(&[0, 1, 2, 3, 4, 5, 6, 7]); + let a: ArrayRef = Arc::new(builder.finish()); + + let schema = Schema::new(vec![Field::new("a", DataType::Date32, false)]); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?; + let dfs = schema.clone().to_dfschema()?; + let props = ExecutionProps::new(); + + let dt = Expr::Column(Column::from_name("a")); + let interval = create_day_time(26, 0); + let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval))); + let op = Operator::Plus; + + let lhs = create_physical_expr(&dt, &dfs, &schema, &props)?; + let rhs = create_physical_expr(&interval, &dfs, &schema, &props)?; + + let cut = DateIntervalExpr::try_new(lhs, op, rhs, &schema)?; + let res = cut.evaluate(&batch)?; + + let mut builder = Date32Builder::new(8); + builder.append_slice(&[26, 27, 28, 29, 30, 31, 32, 33]); + let expected: ArrayRef = Arc::new(builder.finish()); + + // assert + match res { + ColumnarValue::Array(array) => { + assert_eq!(&array, &expected) + } + _ => Err(DataFusionError::NotImplemented( + "Unexpected result!".to_string(), + ))?, + } + + Ok(()) + } + #[test] fn invalid_interval() -> Result<()> { // setup diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index a7f774073f4ce..ffb3f29397772 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -90,7 +90,7 @@ pub fn create_physical_expr( rhs.data_type(input_schema)?, ) { ( - DataType::Date32 | DataType::Date64, + DataType::Date32 | DataType::Date64 | DataType::Timestamp(_, _), Operator::Plus | Operator::Minus, DataType::Interval(_), ) => Ok(Arc::new(DateIntervalExpr::try_new(