From c5b0e7cff30d3048e2567faecc0b496c35953e57 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Mon, 11 Mar 2024 10:24:12 -0400 Subject: [PATCH 1/4] Move make_date, to_char to datafusion-functions --- datafusion-cli/Cargo.lock | 1 + datafusion/expr/src/built_in_function.rs | 52 -- datafusion/expr/src/expr_fn.rs | 7 - datafusion/functions/Cargo.toml | 9 + datafusion/functions/benches/make_date.rs | 111 +++ .../benches/to_char.rs | 65 +- .../functions/src/datetime/make_date.rs | 305 +++++++ datafusion/functions/src/datetime/mod.rs | 90 ++ datafusion/functions/src/datetime/to_char.rs | 578 +++++++++++++ datafusion/physical-expr/Cargo.toml | 8 - datafusion/physical-expr/benches/make_date.rs | 115 --- .../physical-expr/src/datetime_expressions.rs | 781 +----------------- datafusion/physical-expr/src/functions.rs | 2 - datafusion/proto/proto/datafusion.proto | 4 +- datafusion/proto/src/generated/pbjson.rs | 6 - datafusion/proto/src/generated/prost.rs | 12 +- .../proto/src/logical_plan/from_proto.rs | 22 - datafusion/proto/src/logical_plan/to_proto.rs | 2 - 18 files changed, 1127 insertions(+), 1043 deletions(-) create mode 100644 datafusion/functions/benches/make_date.rs rename datafusion/{physical-expr => functions}/benches/to_char.rs (60%) create mode 100644 datafusion/functions/src/datetime/make_date.rs create mode 100644 datafusion/functions/src/datetime/to_char.rs delete mode 100644 datafusion/physical-expr/benches/make_date.rs diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index dd2edb9b96f19..08b71a2468504 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1248,6 +1248,7 @@ version = "36.0.0" dependencies = [ "arrow", "arrow-array", + "arrow-schema", "base64 0.22.0", "chrono", "datafusion-common", diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index 6351e877df003..320819a7a4525 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -22,7 +22,6 @@ use std::fmt; use std::str::FromStr; use std::sync::{Arc, OnceLock}; -use crate::signature::TIMEZONE_WILDCARD; use crate::type_coercion::functions::data_types; use crate::{FuncMonotonicity, Signature, TypeSignature, Volatility}; @@ -228,8 +227,6 @@ pub enum BuiltinScalarFunction { CurrentDate, /// current_time CurrentTime, - /// make_date - MakeDate, /// translate Translate, /// trim @@ -248,8 +245,6 @@ pub enum BuiltinScalarFunction { SubstrIndex, /// find_in_set FindInSet, - /// to_char - ToChar, } /// Maps the sql function name to `BuiltinScalarFunction` @@ -388,8 +383,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::Strpos => Volatility::Immutable, BuiltinScalarFunction::Substr => Volatility::Immutable, BuiltinScalarFunction::ToHex => Volatility::Immutable, - BuiltinScalarFunction::ToChar => Volatility::Immutable, - BuiltinScalarFunction::MakeDate => Volatility::Immutable, BuiltinScalarFunction::Translate => Volatility::Immutable, BuiltinScalarFunction::Trim => Volatility::Immutable, BuiltinScalarFunction::Upper => Volatility::Immutable, @@ -581,14 +574,12 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::FindInSet => { utf8_to_int_type(&input_expr_types[0], "find_in_set") } - BuiltinScalarFunction::ToChar => Ok(Utf8), BuiltinScalarFunction::FromUnixtime => Ok(Timestamp(Second, None)), BuiltinScalarFunction::Now => { Ok(Timestamp(Nanosecond, Some("+00:00".into()))) } BuiltinScalarFunction::CurrentDate => Ok(Date32), BuiltinScalarFunction::CurrentTime => Ok(Time64(Nanosecond)), - BuiltinScalarFunction::MakeDate => Ok(Date32), BuiltinScalarFunction::Translate => { utf8_to_str_type(&input_expr_types[0], "translate") } @@ -675,7 +666,6 @@ impl BuiltinScalarFunction { /// Return the argument [`Signature`] supported by this function pub fn signature(&self) -> Signature { use DataType::*; - use TimeUnit::*; use TypeSignature::*; // note: the physical expression must accept the type returned by this function or the execution panics. @@ -777,41 +767,6 @@ impl BuiltinScalarFunction { vec![Exact(vec![Utf8, Int64]), Exact(vec![LargeUtf8, Int64])], self.volatility(), ), - BuiltinScalarFunction::ToChar => Signature::one_of( - vec![ - Exact(vec![Date32, Utf8]), - Exact(vec![Date64, Utf8]), - Exact(vec![Time32(Millisecond), Utf8]), - Exact(vec![Time32(Second), Utf8]), - Exact(vec![Time64(Microsecond), Utf8]), - Exact(vec![Time64(Nanosecond), Utf8]), - Exact(vec![Timestamp(Second, None), Utf8]), - Exact(vec![ - Timestamp(Second, Some(TIMEZONE_WILDCARD.into())), - Utf8, - ]), - Exact(vec![Timestamp(Millisecond, None), Utf8]), - Exact(vec![ - Timestamp(Millisecond, Some(TIMEZONE_WILDCARD.into())), - Utf8, - ]), - Exact(vec![Timestamp(Microsecond, None), Utf8]), - Exact(vec![ - Timestamp(Microsecond, Some(TIMEZONE_WILDCARD.into())), - Utf8, - ]), - Exact(vec![Timestamp(Nanosecond, None), Utf8]), - Exact(vec![ - Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())), - Utf8, - ]), - Exact(vec![Duration(Second), Utf8]), - Exact(vec![Duration(Millisecond), Utf8]), - Exact(vec![Duration(Microsecond), Utf8]), - Exact(vec![Duration(Nanosecond), Utf8]), - ], - self.volatility(), - ), BuiltinScalarFunction::FromUnixtime => { Signature::uniform(1, vec![Int64], self.volatility()) } @@ -974,11 +929,6 @@ impl BuiltinScalarFunction { | BuiltinScalarFunction::CurrentTime => { Signature::uniform(0, vec![], self.volatility()) } - BuiltinScalarFunction::MakeDate => Signature::uniform( - 3, - vec![Int32, Int64, UInt32, UInt64, Utf8], - self.volatility(), - ), BuiltinScalarFunction::Iszero => Signature::one_of( vec![Exact(vec![Float32]), Exact(vec![Float64])], self.volatility(), @@ -1106,8 +1056,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::Now => &["now"], BuiltinScalarFunction::CurrentDate => &["current_date", "today"], BuiltinScalarFunction::CurrentTime => &["current_time"], - BuiltinScalarFunction::MakeDate => &["make_date"], - BuiltinScalarFunction::ToChar => &["to_char", "date_format"], BuiltinScalarFunction::FromUnixtime => &["from_unixtime"], // hashing functions diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index d1ae06d68f133..248fb5d9ec0e7 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -799,12 +799,6 @@ nary_scalar_expr!( ); // date functions -scalar_expr!( - ToChar, - to_char, - datetime format, - "converts a date, time, timestamp or duration to a string based on the provided format" -); scalar_expr!( FromUnixtime, from_unixtime, @@ -814,7 +808,6 @@ scalar_expr!( scalar_expr!(CurrentDate, current_date, ,"returns current UTC date as a [`DataType::Date32`] value"); scalar_expr!(Now, now, ,"returns current timestamp in nanoseconds, using the same value for all instances of now() in same statement"); scalar_expr!(CurrentTime, current_time, , "returns current UTC time as a [`DataType::Time64`] value"); -scalar_expr!(MakeDate, make_date, year month day, "make a date from year, month and day component parts"); scalar_expr!(Nanvl, nanvl, x y, "returns x if x is not NaN otherwise returns y"); scalar_expr!( Iszero, diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index 0a1215e2464b5..a1e6fbb070473 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -51,6 +51,7 @@ path = "src/lib.rs" [dependencies] arrow = { workspace = true } arrow-array = { workspace = true } +arrow-schema = { workspace = true } base64 = { version = "0.22", optional = true } chrono = { workspace = true } datafusion-common = { workspace = true } @@ -75,3 +76,11 @@ name = "to_timestamp" [[bench]] harness = false name = "regx" + +[[bench]] +harness = false +name = "make_date" + +[[bench]] +harness = false +name = "to_char" diff --git a/datafusion/functions/benches/make_date.rs b/datafusion/functions/benches/make_date.rs new file mode 100644 index 0000000000000..e64d49d3cf6c4 --- /dev/null +++ b/datafusion/functions/benches/make_date.rs @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate criterion; + +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use rand::rngs::ThreadRng; +use rand::Rng; + +use datafusion_expr::{lit, Expr}; +use datafusion_functions::expr_fn::make_date; + +fn years(rng: &mut ThreadRng) -> Vec { + let mut years = vec![]; + for _ in 0..1000 { + years.push(lit(rng.gen_range(1900..2050))); + } + + years +} + +fn months(rng: &mut ThreadRng) -> Vec { + let mut months = vec![]; + for _ in 0..1000 { + months.push(lit(rng.gen_range(1..13))); + } + + months +} + +fn days(rng: &mut ThreadRng) -> Vec { + let mut days = vec![]; + for _ in 0..1000 { + days.push(lit(rng.gen_range(1..29))); + } + + days +} +fn criterion_benchmark(c: &mut Criterion) { + c.bench_function("make_date_col_col_col_1000", |b| { + let mut rng = rand::thread_rng(); + let years = years(&mut rng); + let months = months(&mut rng); + let days = days(&mut rng); + + b.iter(|| { + years.iter().enumerate().for_each(|(idx, i)| { + black_box(make_date( + i.clone(), + months.get(idx).unwrap().clone(), + days.get(idx).unwrap().clone(), + )); + }) + }) + }); + + c.bench_function("make_date_scalar_col_col_1000", |b| { + let mut rng = rand::thread_rng(); + let year = lit(2025); + let months = months(&mut rng); + let days = days(&mut rng); + + b.iter(|| { + months.iter().enumerate().for_each(|(idx, i)| { + black_box(make_date( + year.clone(), + i.clone(), + days.get(idx).unwrap().clone(), + )); + }) + }) + }); + + c.bench_function("make_date_scalar_scalar_col_1000", |b| { + let mut rng = rand::thread_rng(); + let year = lit(2025); + let months = lit(11); + let days = days(&mut rng); + + b.iter(|| { + days.iter().for_each(|i| { + black_box(make_date(year.clone(), months.clone(), i.clone())); + }) + }) + }); + + c.bench_function("make_date_scalar_scalar_scalar", |b| { + let year = lit(2025); + let month = lit(11); + let day = lit(26); + + b.iter(|| black_box(make_date(year.clone(), month.clone(), day.clone()))) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/physical-expr/benches/to_char.rs b/datafusion/functions/benches/to_char.rs similarity index 60% rename from datafusion/physical-expr/benches/to_char.rs rename to datafusion/functions/benches/to_char.rs index 3bcea09acf03b..ff08537071bb9 100644 --- a/datafusion/physical-expr/benches/to_char.rs +++ b/datafusion/functions/benches/to_char.rs @@ -17,9 +17,6 @@ extern crate criterion; -use std::sync::Arc; - -use arrow_array::{ArrayRef, Date32Array, StringArray}; use chrono::prelude::*; use chrono::TimeDelta; use criterion::{black_box, criterion_group, criterion_main, Criterion}; @@ -27,10 +24,9 @@ use rand::rngs::ThreadRng; use rand::seq::SliceRandom; use rand::Rng; -use datafusion_common::ScalarValue; use datafusion_common::ScalarValue::TimestampNanosecond; -use datafusion_expr::ColumnarValue; -use datafusion_physical_expr::datetime_expressions::to_char; +use datafusion_expr::{lit, Expr}; +use datafusion_functions::expr_fn::to_char; fn random_date_in_range( rng: &mut ThreadRng, @@ -42,8 +38,8 @@ fn random_date_in_range( start_date + TimeDelta::try_days(random_days).unwrap() } -fn data(rng: &mut ThreadRng) -> Date32Array { - let mut data: Vec = vec![]; +fn data(rng: &mut ThreadRng) -> Vec { + let mut data: Vec = vec![]; let unix_days_from_ce = NaiveDate::from_ymd_opt(1970, 1, 1) .unwrap() .num_days_from_ce(); @@ -54,16 +50,15 @@ fn data(rng: &mut ThreadRng) -> Date32Array { .parse::() .expect("Date should parse"); for _ in 0..1000 { - data.push( - random_date_in_range(rng, start_date, end_date).num_days_from_ce() - - unix_days_from_ce, - ); + data.push(lit(random_date_in_range(rng, start_date, end_date) + .num_days_from_ce() + - unix_days_from_ce)); } - Date32Array::from(data) + data } -fn patterns(rng: &mut ThreadRng) -> StringArray { +fn patterns(rng: &mut ThreadRng) -> Vec { let samples = vec![ "%Y:%m:%d".to_string(), "%d-%m-%Y".to_string(), @@ -71,39 +66,36 @@ fn patterns(rng: &mut ThreadRng) -> StringArray { "%Y%m%d".to_string(), "%Y...%m...%d".to_string(), ]; - let mut data: Vec = vec![]; + let mut data: Vec = vec![]; for _ in 0..1000 { - data.push(samples.choose(rng).unwrap().to_string()); + data.push(lit(samples.choose(rng).unwrap().to_string())); } - StringArray::from(data) + data } fn criterion_benchmark(c: &mut Criterion) { c.bench_function("to_char_array_array_1000", |b| { let mut rng = rand::thread_rng(); - let data = ColumnarValue::Array(Arc::new(data(&mut rng)) as ArrayRef); - let patterns = ColumnarValue::Array(Arc::new(patterns(&mut rng)) as ArrayRef); + let data = data(&mut rng); + let patterns = patterns(&mut rng); b.iter(|| { - black_box( - to_char(&[data.clone(), patterns.clone()]) - .expect("to_char should work on valid values"), - ) + data.iter().enumerate().for_each(|(idx, i)| { + black_box(to_char(i.clone(), patterns.get(idx).unwrap().clone())); + }) }) }); c.bench_function("to_char_array_scalar_1000", |b| { let mut rng = rand::thread_rng(); - let data = ColumnarValue::Array(Arc::new(data(&mut rng)) as ArrayRef); - let patterns = - ColumnarValue::Scalar(ScalarValue::Utf8(Some("%Y-%m-%d".to_string()))); + let data = data(&mut rng); + let patterns = lit("%Y-%m-%d"); b.iter(|| { - black_box( - to_char(&[data.clone(), patterns.clone()]) - .expect("to_char should work on valid values"), - ) + data.iter().for_each(|i| { + black_box(to_char(i.clone(), patterns.clone())); + }) }) }); @@ -116,17 +108,10 @@ fn criterion_benchmark(c: &mut Criterion) { .and_utc() .timestamp_nanos_opt() .unwrap(); - let data = ColumnarValue::Scalar(TimestampNanosecond(Some(timestamp), None)); - let pattern = ColumnarValue::Scalar(ScalarValue::Utf8(Some( - "%d-%m-%Y %H:%M:%S".to_string(), - ))); + let data = lit(TimestampNanosecond(Some(timestamp), None)); + let pattern = lit("%d-%m-%Y %H:%M:%S"); - b.iter(|| { - black_box( - to_char(&[data.clone(), pattern.clone()]) - .expect("to_char should work on valid values"), - ) - }) + b.iter(|| black_box(to_char(data.clone(), pattern.clone()))) }); } diff --git a/datafusion/functions/src/datetime/make_date.rs b/datafusion/functions/src/datetime/make_date.rs new file mode 100644 index 0000000000000..b8010f64d270a --- /dev/null +++ b/datafusion/functions/src/datetime/make_date.rs @@ -0,0 +1,305 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::datatypes::DataType; +use arrow::datatypes::DataType::{Date32, Int32, Int64, UInt32, UInt64, Utf8}; +use arrow_array::builder::PrimitiveBuilder; +use arrow_array::cast::AsArray; +use arrow_array::types::{Date32Type, Int32Type}; +use arrow_array::PrimitiveArray; +use chrono::prelude::*; + +use datafusion_common::{exec_err, Result, ScalarValue}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; + +#[derive(Debug)] +pub(super) struct MakeDateFunc { + signature: Signature, +} + +impl MakeDateFunc { + pub fn new() -> Self { + Self { + signature: Signature::uniform( + 3, + vec![Int32, Int64, UInt32, UInt64, Utf8], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for MakeDateFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "make_date" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(Date32) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + if args.len() != 3 { + return exec_err!( + "make_date function requires 3 arguments, got {}", + args.len() + ); + } + + // first, identify if any of the arguments is an Array. If yes, store its `len`, + // as any scalar will need to be converted to an array of len `len`. + let len = args + .iter() + .fold(Option::::None, |acc, arg| match arg { + ColumnarValue::Scalar(_) => acc, + ColumnarValue::Array(a) => Some(a.len()), + }); + + let is_scalar = len.is_none(); + let array_size = if is_scalar { 1 } else { len.unwrap() }; + + let years = args[0].cast_to(&DataType::Int32, None)?; + let months = args[1].cast_to(&DataType::Int32, None)?; + let days = args[2].cast_to(&DataType::Int32, None)?; + + // since the epoch for the date32 datatype is the unix epoch + // we need to subtract the unix epoch from the current date + // note this can result in a negative value + let unix_days_from_ce = NaiveDate::from_ymd_opt(1970, 1, 1) + .unwrap() + .num_days_from_ce(); + + let mut builder: PrimitiveBuilder = + PrimitiveArray::builder(array_size); + + let construct_date_fn = |builder: &mut PrimitiveBuilder, + year: i32, + month: i32, + day: i32, + unix_days_from_ce: i32| + -> Result<()> { + let Ok(m) = u32::try_from(month) else { + return exec_err!("Month value '{month:?}' is out of range"); + }; + let Ok(d) = u32::try_from(day) else { + return exec_err!("Day value '{day:?}' is out of range"); + }; + + let date = NaiveDate::from_ymd_opt(year, m, d); + + match date { + Some(d) => builder.append_value(d.num_days_from_ce() - unix_days_from_ce), + None => { + return exec_err!("Unable to parse date from {year}, {month}, {day}") + } + }; + Ok(()) + }; + + let scalar_value_fn = |col: &ColumnarValue| -> Result { + let ColumnarValue::Scalar(s) = col else { + return exec_err!("Expected scalar value"); + }; + let ScalarValue::Int32(Some(i)) = s else { + return exec_err!("Unable to parse date from null/empty value"); + }; + Ok(*i) + }; + + // For scalar only columns the operation is faster without using the PrimitiveArray + if is_scalar { + construct_date_fn( + &mut builder, + scalar_value_fn(&years)?, + scalar_value_fn(&months)?, + scalar_value_fn(&days)?, + unix_days_from_ce, + )?; + } else { + let to_primitive_array = |col: &ColumnarValue, + scalar_count: usize| + -> Result> { + match col { + ColumnarValue::Array(a) => { + Ok(a.as_primitive::().to_owned()) + } + _ => { + let v = scalar_value_fn(col).unwrap(); + Ok(PrimitiveArray::::from_value(v, scalar_count)) + } + } + }; + + let years = to_primitive_array(&years, array_size).unwrap(); + let months = to_primitive_array(&months, array_size).unwrap(); + let days = to_primitive_array(&days, array_size).unwrap(); + for i in 0..array_size { + construct_date_fn( + &mut builder, + years.value(i), + months.value(i), + days.value(i), + unix_days_from_ce, + )?; + } + } + + let arr = builder.finish(); + + if is_scalar { + // If all inputs are scalar, keeps output as scalar + Ok(ColumnarValue::Scalar(ScalarValue::Date32(Some( + arr.value(0), + )))) + } else { + Ok(ColumnarValue::Array(Arc::new(arr))) + } + } +} + +#[cfg(test)] +mod tests { + use crate::datetime::make_date::MakeDateFunc; + use arrow_array::{Array, Date32Array, Int32Array, Int64Array, UInt32Array}; + use datafusion_common::ScalarValue; + use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; + use std::sync::Arc; + + #[test] + fn test_make_date() { + let res = MakeDateFunc::new() + .invoke(&[ + ColumnarValue::Scalar(ScalarValue::Int32(Some(2024))), + ColumnarValue::Scalar(ScalarValue::Int64(Some(1))), + ColumnarValue::Scalar(ScalarValue::UInt32(Some(14))), + ]) + .expect("that make_date parsed values without error"); + + if let ColumnarValue::Scalar(ScalarValue::Date32(date)) = res { + assert_eq!(19736, date.unwrap()); + } else { + panic!("Expected a scalar value") + } + + let res = MakeDateFunc::new() + .invoke(&[ + ColumnarValue::Scalar(ScalarValue::Int64(Some(2024))), + ColumnarValue::Scalar(ScalarValue::UInt64(Some(1))), + ColumnarValue::Scalar(ScalarValue::UInt32(Some(14))), + ]) + .expect("that make_date parsed values without error"); + + if let ColumnarValue::Scalar(ScalarValue::Date32(date)) = res { + assert_eq!(19736, date.unwrap()); + } else { + panic!("Expected a scalar value") + } + + let res = MakeDateFunc::new() + .invoke(&[ + ColumnarValue::Scalar(ScalarValue::Utf8(Some("2024".to_string()))), + ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some("1".to_string()))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("14".to_string()))), + ]) + .expect("that make_date parsed values without error"); + + if let ColumnarValue::Scalar(ScalarValue::Date32(date)) = res { + assert_eq!(19736, date.unwrap()); + } else { + panic!("Expected a scalar value") + } + + let years = Arc::new((2021..2025).map(Some).collect::()); + let months = Arc::new((1..5).map(Some).collect::()); + let days = Arc::new((11..15).map(Some).collect::()); + let res = MakeDateFunc::new() + .invoke(&[ + ColumnarValue::Array(years), + ColumnarValue::Array(months), + ColumnarValue::Array(days), + ]) + .expect("that make_date parsed values without error"); + + if let ColumnarValue::Array(array) = res { + assert_eq!(array.len(), 4); + let mut builder = Date32Array::builder(4); + builder.append_value(18_638); + builder.append_value(19_035); + builder.append_value(19_429); + builder.append_value(19_827); + assert_eq!(&builder.finish() as &dyn Array, array.as_ref()); + } else { + panic!("Expected a columnar array") + } + + // + // Fallible test cases + // + + // invalid number of arguments + let res = MakeDateFunc::new() + .invoke(&[ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))]); + assert_eq!( + res.err().unwrap().strip_backtrace(), + "Execution error: make_date function requires 3 arguments, got 1" + ); + + // invalid type + let res = MakeDateFunc::new().invoke(&[ + ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))), + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), + ]); + assert_eq!( + res.err().unwrap().strip_backtrace(), + "Arrow error: Cast error: Casting from Interval(YearMonth) to Int32 not supported" + ); + + // overflow of month + let res = MakeDateFunc::new().invoke(&[ + ColumnarValue::Scalar(ScalarValue::Int32(Some(2023))), + ColumnarValue::Scalar(ScalarValue::UInt64(Some(u64::MAX))), + ColumnarValue::Scalar(ScalarValue::Int32(Some(22))), + ]); + assert_eq!( + res.err().unwrap().strip_backtrace(), + "Arrow error: Cast error: Can't cast value 18446744073709551615 to type Int32" + ); + + // overflow of day + let res = MakeDateFunc::new().invoke(&[ + ColumnarValue::Scalar(ScalarValue::Int32(Some(2023))), + ColumnarValue::Scalar(ScalarValue::Int32(Some(22))), + ColumnarValue::Scalar(ScalarValue::UInt32(Some(u32::MAX))), + ]); + assert_eq!( + res.err().unwrap().strip_backtrace(), + "Arrow error: Cast error: Can't cast value 4294967295 to type Int32" + ); + } +} diff --git a/datafusion/functions/src/datetime/mod.rs b/datafusion/functions/src/datetime/mod.rs index 7792174a7684f..065543275a515 100644 --- a/datafusion/functions/src/datetime/mod.rs +++ b/datafusion/functions/src/datetime/mod.rs @@ -25,6 +25,8 @@ mod common; mod date_bin; mod date_part; mod date_trunc; +mod make_date; +mod to_char; mod to_date; mod to_timestamp; mod to_unixtime; @@ -33,6 +35,8 @@ mod to_unixtime; make_udf_function!(date_bin::DateBinFunc, DATE_BIN, date_bin); make_udf_function!(date_part::DatePartFunc, DATE_PART, date_part); make_udf_function!(date_trunc::DateTruncFunc, DATE_TRUNC, date_trunc); +make_udf_function!(make_date::MakeDateFunc, MAKE_DATE, make_date); +make_udf_function!(to_char::ToCharFunc, TO_CHAR, to_char); make_udf_function!(to_date::ToDateFunc, TO_DATE, to_date); make_udf_function!(to_unixtime::ToUnixtimeFunc, TO_UNIXTIME, to_unixtime); make_udf_function!(to_timestamp::ToTimestampFunc, TO_TIMESTAMP, to_timestamp); @@ -78,6 +82,90 @@ pub mod expr_fn { super::date_trunc().call(vec![part, date]) } + #[doc = "make a date from year, month and day component parts"] + pub fn make_date(year: Expr, month: Expr, day: Expr) -> Expr { + super::make_date().call(vec![year, month, day]) + } + + /// Returns a string representation of a date, time, timestamp or duration based + /// on a Chrono pattern. + /// + /// The syntax for the patterns can be found at + /// + /// + /// # Examples + /// + /// ```ignore + /// # use chrono::prelude::*; + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # use datafusion_common::ScalarValue::TimestampNanosecond; + /// # use std::sync::Arc; + /// # use arrow_array::{Date32Array, RecordBatch, StringArray}; + /// # use arrow_schema::{DataType, Field, Schema}; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let schema = Arc::new(Schema::new(vec![ + /// Field::new("values", DataType::Date32, false), + /// Field::new("patterns", DataType::Utf8, false), + /// ])); + /// + /// let batch = RecordBatch::try_new( + /// schema, + /// vec![ + /// Arc::new(Date32Array::from(vec![ + /// 18506, + /// 18507, + /// 18508, + /// 18509, + /// ])), + /// Arc::new(StringArray::from(vec![ + /// "%Y-%m-%d", + /// "%Y:%m:%d", + /// "%Y%m%d", + /// "%d-%m-%Y", + /// ])), + /// ], + /// )?; + /// + /// let ctx = SessionContext::new(); + /// ctx.register_batch("t", batch)?; + /// let df = ctx.table("t").await?; + /// + /// // use the to_char function to convert col 'values', + /// // to strings using patterns in col 'patterns' + /// let df = df.with_column( + /// "date_str", + /// to_char(col("values"), col("patterns")) + /// )?; + /// // Note that providing a scalar value for the pattern + /// // is more performant + /// let df = df.with_column( + /// "date_str2", + /// to_char(col("values"), lit("%d-%m-%Y")) + /// )?; + /// // literals can be used as well with dataframe calls + /// let timestamp = "2026-07-08T09:10:11" + /// .parse::() + /// .unwrap() + /// .with_nanosecond(56789) + /// .unwrap() + /// .timestamp_nanos_opt() + /// .unwrap(); + /// let df = df.with_column( + /// "timestamp_str", + /// to_char(lit(TimestampNanosecond(Some(timestamp), None)), lit("%d-%m-%Y %H:%M:%S")) + /// )?; + /// + /// df.show().await?; + /// + /// # Ok(()) + /// # } + /// ``` + pub fn to_char(datetime: Expr, format: Expr) -> Expr { + super::to_char().call(vec![datetime, format]) + } + /// ```ignore /// # use std::sync::Arc; /// @@ -165,6 +253,8 @@ pub fn functions() -> Vec> { date_bin(), date_part(), date_trunc(), + make_date(), + to_char(), to_date(), to_unixtime(), to_timestamp(), diff --git a/datafusion/functions/src/datetime/to_char.rs b/datafusion/functions/src/datetime/to_char.rs new file mode 100644 index 0000000000000..90b3a1d35353c --- /dev/null +++ b/datafusion/functions/src/datetime/to_char.rs @@ -0,0 +1,578 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::datatypes::DataType; +use arrow::util::display::{ArrayFormatter, DurationFormat, FormatOptions}; +use arrow_array::cast::AsArray; +use arrow_array::{Array, ArrayRef, StringArray}; +use arrow_schema::DataType::{Date32, Date64, Duration, Time32, Time64, Timestamp, Utf8}; +use arrow_schema::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}; + +use datafusion_common::{exec_err, Result, ScalarValue}; +use datafusion_expr::TypeSignature::Exact; +use datafusion_expr::{ + ColumnarValue, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD, +}; + +#[derive(Debug)] +pub(super) struct ToCharFunc { + signature: Signature, + aliases: Vec, +} + +impl ToCharFunc { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + Exact(vec![Date32, Utf8]), + Exact(vec![Date64, Utf8]), + Exact(vec![Time32(Millisecond), Utf8]), + Exact(vec![Time32(Second), Utf8]), + Exact(vec![Time64(Microsecond), Utf8]), + Exact(vec![Time64(Nanosecond), Utf8]), + Exact(vec![Timestamp(Second, None), Utf8]), + Exact(vec![ + Timestamp(Second, Some(TIMEZONE_WILDCARD.into())), + Utf8, + ]), + Exact(vec![Timestamp(Millisecond, None), Utf8]), + Exact(vec![ + Timestamp(Millisecond, Some(TIMEZONE_WILDCARD.into())), + Utf8, + ]), + Exact(vec![Timestamp(Microsecond, None), Utf8]), + Exact(vec![ + Timestamp(Microsecond, Some(TIMEZONE_WILDCARD.into())), + Utf8, + ]), + Exact(vec![Timestamp(Nanosecond, None), Utf8]), + Exact(vec![ + Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())), + Utf8, + ]), + Exact(vec![Duration(Second), Utf8]), + Exact(vec![Duration(Millisecond), Utf8]), + Exact(vec![Duration(Microsecond), Utf8]), + Exact(vec![Duration(Nanosecond), Utf8]), + ], + Volatility::Immutable, + ), + aliases: vec![String::from("date_format")], + } + } +} + +impl ScalarUDFImpl for ToCharFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "to_char" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(Utf8) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + if args.len() != 2 { + return exec_err!( + "to_char function requires 2 arguments, got {}", + args.len() + ); + } + + match &args[1] { + // null format, use default formats + ColumnarValue::Scalar(ScalarValue::Utf8(None)) + | ColumnarValue::Scalar(ScalarValue::Null) => { + _to_char_scalar(args[0].clone(), None) + } + // constant format + ColumnarValue::Scalar(ScalarValue::Utf8(Some(format))) => { + // invoke to_char_scalar with the known string, without converting to array + _to_char_scalar(args[0].clone(), Some(format)) + } + ColumnarValue::Array(_) => _to_char_array(args), + _ => { + exec_err!( + "Format for `to_char` must be non-null Utf8, received {:?}", + args[1].data_type() + ) + } + } + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + +fn _build_format_options<'a>( + data_type: &DataType, + format: Option<&'a str>, +) -> Result, Result> { + let Some(format) = format else { + return Ok(FormatOptions::new()); + }; + let format_options = match data_type { + Date32 => FormatOptions::new().with_date_format(Some(format)), + Date64 => FormatOptions::new().with_datetime_format(Some(format)), + Time32(_) => FormatOptions::new().with_time_format(Some(format)), + Time64(_) => FormatOptions::new().with_time_format(Some(format)), + Timestamp(_, _) => FormatOptions::new() + .with_timestamp_format(Some(format)) + .with_timestamp_tz_format(Some(format)), + Duration(_) => FormatOptions::new().with_duration_format( + if "ISO8601".eq_ignore_ascii_case(format) { + DurationFormat::ISO8601 + } else { + DurationFormat::Pretty + }, + ), + other => { + return Err(exec_err!( + "to_char only supports date, time, timestamp and duration data types, received {other:?}" + )); + } + }; + Ok(format_options) +} + +/// Special version when arg\[1] is a scalar +fn _to_char_scalar( + expression: ColumnarValue, + format: Option<&str>, +) -> Result { + // it's possible that the expression is a scalar however because + // of the implementation in arrow-rs we need to convert it to an array + let data_type = &expression.data_type(); + let is_scalar_expression = matches!(&expression, ColumnarValue::Scalar(_)); + let array = expression.into_array(1)?; + let format_options = match _build_format_options(data_type, format) { + Ok(value) => value, + Err(value) => return value, + }; + + let formatter = ArrayFormatter::try_new(array.as_ref(), &format_options)?; + let formatted: Result, arrow_schema::ArrowError> = (0..array.len()) + .map(|i| formatter.value(i).try_to_string()) + .collect(); + + if let Ok(formatted) = formatted { + if is_scalar_expression { + Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some( + formatted.first().unwrap().to_string(), + )))) + } else { + Ok(ColumnarValue::Array( + Arc::new(StringArray::from(formatted)) as ArrayRef + )) + } + } else { + exec_err!("{}", formatted.unwrap_err()) + } +} + +fn _to_char_array(args: &[ColumnarValue]) -> Result { + let arrays = ColumnarValue::values_to_arrays(args)?; + let mut results: Vec = vec![]; + let format_array = arrays[1].as_string::(); + let data_type = arrays[0].data_type(); + + for idx in 0..arrays[0].len() { + let format = if format_array.is_null(idx) { + None + } else { + Some(format_array.value(idx)) + }; + let format_options = match _build_format_options(data_type, format) { + Ok(value) => value, + Err(value) => return value, + }; + // this isn't ideal but this can't use ValueFormatter as it isn't independent + // from ArrayFormatter + let formatter = ArrayFormatter::try_new(arrays[0].as_ref(), &format_options)?; + let result = formatter.value(idx).try_to_string(); + match result { + Ok(value) => results.push(value), + Err(e) => return exec_err!("{}", e), + } + } + + match args[0] { + ColumnarValue::Array(_) => Ok(ColumnarValue::Array(Arc::new(StringArray::from( + results, + )) as ArrayRef)), + ColumnarValue::Scalar(_) => Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some( + results.first().unwrap().to_string(), + )))), + } +} + +#[cfg(test)] +mod tests { + use crate::datetime::to_char::ToCharFunc; + use arrow_array::{ + Array, ArrayRef, Date32Array, Date64Array, StringArray, Time32MillisecondArray, + Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, + }; + use chrono::{NaiveDateTime, Timelike}; + use datafusion_common::ScalarValue; + use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; + use std::sync::Arc; + + #[test] + fn test_to_char() { + let date = "2020-01-02T03:04:05" + .parse::() + .unwrap() + .with_nanosecond(12345) + .unwrap(); + let date2 = "2026-07-08T09:10:11" + .parse::() + .unwrap() + .with_nanosecond(56789) + .unwrap(); + + let scalar_data = vec![ + ( + ScalarValue::Date32(Some(18506)), + ScalarValue::Utf8(Some("%Y::%m::%d".to_string())), + "2020::09::01".to_string(), + ), + ( + ScalarValue::Date64(Some(date.and_utc().timestamp_millis())), + ScalarValue::Utf8(Some("%Y::%m::%d".to_string())), + "2020::01::02".to_string(), + ), + ( + ScalarValue::Time32Second(Some(31851)), + ScalarValue::Utf8(Some("%H-%M-%S".to_string())), + "08-50-51".to_string(), + ), + ( + ScalarValue::Time32Millisecond(Some(18506000)), + ScalarValue::Utf8(Some("%H-%M-%S".to_string())), + "05-08-26".to_string(), + ), + ( + ScalarValue::Time64Microsecond(Some(12344567000)), + ScalarValue::Utf8(Some("%H-%M-%S %f".to_string())), + "03-25-44 567000000".to_string(), + ), + ( + ScalarValue::Time64Nanosecond(Some(12344567890000)), + ScalarValue::Utf8(Some("%H-%M-%S %f".to_string())), + "03-25-44 567890000".to_string(), + ), + ( + ScalarValue::TimestampSecond(Some(date.and_utc().timestamp()), None), + ScalarValue::Utf8(Some("%Y::%m::%d %S::%M::%H".to_string())), + "2020::01::02 05::04::03".to_string(), + ), + ( + ScalarValue::TimestampMillisecond( + Some(date.and_utc().timestamp_millis()), + None, + ), + ScalarValue::Utf8(Some("%Y::%m::%d %S::%M::%H".to_string())), + "2020::01::02 05::04::03".to_string(), + ), + ( + ScalarValue::TimestampMicrosecond( + Some(date.and_utc().timestamp_micros()), + None, + ), + ScalarValue::Utf8(Some("%Y::%m::%d %S::%M::%H %f".to_string())), + "2020::01::02 05::04::03 000012000".to_string(), + ), + ( + ScalarValue::TimestampNanosecond( + Some(date.and_utc().timestamp_nanos_opt().unwrap()), + None, + ), + ScalarValue::Utf8(Some("%Y::%m::%d %S::%M::%H %f".to_string())), + "2020::01::02 05::04::03 000012345".to_string(), + ), + ]; + + for (value, format, expected) in scalar_data { + let result = ToCharFunc::new() + .invoke(&[ColumnarValue::Scalar(value), ColumnarValue::Scalar(format)]) + .expect("that to_char parsed values without error"); + + if let ColumnarValue::Scalar(ScalarValue::Utf8(date)) = result { + assert_eq!(expected, date.unwrap()); + } else { + panic!("Expected a scalar value") + } + } + + let scalar_array_data = vec![ + ( + ScalarValue::Date32(Some(18506)), + StringArray::from(vec!["%Y::%m::%d".to_string()]), + "2020::09::01".to_string(), + ), + ( + ScalarValue::Date64(Some(date.and_utc().timestamp_millis())), + StringArray::from(vec!["%Y::%m::%d".to_string()]), + "2020::01::02".to_string(), + ), + ( + ScalarValue::Time32Second(Some(31851)), + StringArray::from(vec!["%H-%M-%S".to_string()]), + "08-50-51".to_string(), + ), + ( + ScalarValue::Time32Millisecond(Some(18506000)), + StringArray::from(vec!["%H-%M-%S".to_string()]), + "05-08-26".to_string(), + ), + ( + ScalarValue::Time64Microsecond(Some(12344567000)), + StringArray::from(vec!["%H-%M-%S %f".to_string()]), + "03-25-44 567000000".to_string(), + ), + ( + ScalarValue::Time64Nanosecond(Some(12344567890000)), + StringArray::from(vec!["%H-%M-%S %f".to_string()]), + "03-25-44 567890000".to_string(), + ), + ( + ScalarValue::TimestampSecond(Some(date.and_utc().timestamp()), None), + StringArray::from(vec!["%Y::%m::%d %S::%M::%H".to_string()]), + "2020::01::02 05::04::03".to_string(), + ), + ( + ScalarValue::TimestampMillisecond( + Some(date.and_utc().timestamp_millis()), + None, + ), + StringArray::from(vec!["%Y::%m::%d %S::%M::%H".to_string()]), + "2020::01::02 05::04::03".to_string(), + ), + ( + ScalarValue::TimestampMicrosecond( + Some(date.and_utc().timestamp_micros()), + None, + ), + StringArray::from(vec!["%Y::%m::%d %S::%M::%H %f".to_string()]), + "2020::01::02 05::04::03 000012000".to_string(), + ), + ( + ScalarValue::TimestampNanosecond( + Some(date.and_utc().timestamp_nanos_opt().unwrap()), + None, + ), + StringArray::from(vec!["%Y::%m::%d %S::%M::%H %f".to_string()]), + "2020::01::02 05::04::03 000012345".to_string(), + ), + ]; + + for (value, format, expected) in scalar_array_data { + let result = ToCharFunc::new() + .invoke(&[ + ColumnarValue::Scalar(value), + ColumnarValue::Array(Arc::new(format) as ArrayRef), + ]) + .expect("that to_char parsed values without error"); + + if let ColumnarValue::Scalar(ScalarValue::Utf8(date)) = result { + assert_eq!(expected, date.unwrap()); + } else { + panic!("Expected a scalar value") + } + } + + let array_scalar_data = vec![ + ( + Arc::new(Date32Array::from(vec![18506, 18507])) as ArrayRef, + ScalarValue::Utf8(Some("%Y::%m::%d".to_string())), + StringArray::from(vec!["2020::09::01", "2020::09::02"]), + ), + ( + Arc::new(Date64Array::from(vec![ + date.and_utc().timestamp_millis(), + date2.and_utc().timestamp_millis(), + ])) as ArrayRef, + ScalarValue::Utf8(Some("%Y::%m::%d".to_string())), + StringArray::from(vec!["2020::01::02", "2026::07::08"]), + ), + ]; + + let array_array_data = vec![ + ( + Arc::new(Date32Array::from(vec![18506, 18507])) as ArrayRef, + StringArray::from(vec!["%Y::%m::%d", "%d::%m::%Y"]), + StringArray::from(vec!["2020::09::01", "02::09::2020"]), + ), + ( + Arc::new(Date64Array::from(vec![ + date.and_utc().timestamp_millis(), + date2.and_utc().timestamp_millis(), + ])) as ArrayRef, + StringArray::from(vec!["%Y::%m::%d", "%d::%m::%Y"]), + StringArray::from(vec!["2020::01::02", "08::07::2026"]), + ), + ( + Arc::new(Time32MillisecondArray::from(vec![1850600, 1860700])) + as ArrayRef, + StringArray::from(vec!["%H:%M:%S", "%H::%M::%S"]), + StringArray::from(vec!["00:30:50", "00::31::00"]), + ), + ( + Arc::new(Time32SecondArray::from(vec![18506, 18507])) as ArrayRef, + StringArray::from(vec!["%H:%M:%S", "%H::%M::%S"]), + StringArray::from(vec!["05:08:26", "05::08::27"]), + ), + ( + Arc::new(Time64MicrosecondArray::from(vec![12344567000, 22244567000])) + as ArrayRef, + StringArray::from(vec!["%H:%M:%S", "%H::%M::%S"]), + StringArray::from(vec!["03:25:44", "06::10::44"]), + ), + ( + Arc::new(Time64NanosecondArray::from(vec![ + 1234456789000, + 2224456789000, + ])) as ArrayRef, + StringArray::from(vec!["%H:%M:%S", "%H::%M::%S"]), + StringArray::from(vec!["00:20:34", "00::37::04"]), + ), + ( + Arc::new(TimestampSecondArray::from(vec![ + date.and_utc().timestamp(), + date2.and_utc().timestamp(), + ])) as ArrayRef, + StringArray::from(vec!["%Y::%m::%d %S::%M::%H", "%d::%m::%Y %S-%M-%H"]), + StringArray::from(vec![ + "2020::01::02 05::04::03", + "08::07::2026 11-10-09", + ]), + ), + ( + Arc::new(TimestampMillisecondArray::from(vec![ + date.and_utc().timestamp_millis(), + date2.and_utc().timestamp_millis(), + ])) as ArrayRef, + StringArray::from(vec![ + "%Y::%m::%d %S::%M::%H %f", + "%d::%m::%Y %S-%M-%H %f", + ]), + StringArray::from(vec![ + "2020::01::02 05::04::03 000000000", + "08::07::2026 11-10-09 000000000", + ]), + ), + ( + Arc::new(TimestampMicrosecondArray::from(vec![ + date.and_utc().timestamp_micros(), + date2.and_utc().timestamp_micros(), + ])) as ArrayRef, + StringArray::from(vec![ + "%Y::%m::%d %S::%M::%H %f", + "%d::%m::%Y %S-%M-%H %f", + ]), + StringArray::from(vec![ + "2020::01::02 05::04::03 000012000", + "08::07::2026 11-10-09 000056000", + ]), + ), + ( + Arc::new(TimestampNanosecondArray::from(vec![ + date.and_utc().timestamp_nanos_opt().unwrap(), + date2.and_utc().timestamp_nanos_opt().unwrap(), + ])) as ArrayRef, + StringArray::from(vec![ + "%Y::%m::%d %S::%M::%H %f", + "%d::%m::%Y %S-%M-%H %f", + ]), + StringArray::from(vec![ + "2020::01::02 05::04::03 000012345", + "08::07::2026 11-10-09 000056789", + ]), + ), + ]; + + for (value, format, expected) in array_scalar_data { + let result = ToCharFunc::new() + .invoke(&[ + ColumnarValue::Array(value as ArrayRef), + ColumnarValue::Scalar(format), + ]) + .expect("that to_char parsed values without error"); + + if let ColumnarValue::Array(result) = result { + assert_eq!(result.len(), 2); + assert_eq!(&expected as &dyn Array, result.as_ref()); + } else { + panic!("Expected an array value") + } + } + + for (value, format, expected) in array_array_data { + let result = ToCharFunc::new() + .invoke(&[ + ColumnarValue::Array(value), + ColumnarValue::Array(Arc::new(format) as ArrayRef), + ]) + .expect("that to_char parsed values without error"); + + if let ColumnarValue::Array(result) = result { + assert_eq!(result.len(), 2); + assert_eq!(&expected as &dyn Array, result.as_ref()); + } else { + panic!("Expected an array value") + } + } + + // + // Fallible test cases + // + + // invalid number of arguments + let result = ToCharFunc::new() + .invoke(&[ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))]); + assert_eq!( + result.err().unwrap().strip_backtrace(), + "Execution error: to_char function requires 2 arguments, got 1" + ); + + // invalid type + let result = ToCharFunc::new().invoke(&[ + ColumnarValue::Scalar(ScalarValue::Int32(Some(1))), + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), + ]); + assert_eq!( + result.err().unwrap().strip_backtrace(), + "Execution error: Format for `to_char` must be non-null Utf8, received Timestamp(Nanosecond, None)" + ); + } +} diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 0dd6fd2a0710d..d63ad9bb4a3a2 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -85,11 +85,3 @@ tokio = { workspace = true, features = ["rt-multi-thread"] } [[bench]] harness = false name = "in_list" - -[[bench]] -harness = false -name = "make_date" - -[[bench]] -harness = false -name = "to_char" diff --git a/datafusion/physical-expr/benches/make_date.rs b/datafusion/physical-expr/benches/make_date.rs deleted file mode 100644 index 819d9539f2ce3..0000000000000 --- a/datafusion/physical-expr/benches/make_date.rs +++ /dev/null @@ -1,115 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -extern crate criterion; - -use std::sync::Arc; - -use arrow_array::{ArrayRef, Int32Array}; -use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use rand::rngs::ThreadRng; -use rand::Rng; - -use datafusion_common::ScalarValue; -use datafusion_expr::ColumnarValue; -use datafusion_physical_expr::datetime_expressions::make_date; - -fn years(rng: &mut ThreadRng) -> Int32Array { - let mut years = vec![]; - for _ in 0..1000 { - years.push(rng.gen_range(1900..2050)); - } - - Int32Array::from(years) -} - -fn months(rng: &mut ThreadRng) -> Int32Array { - let mut months = vec![]; - for _ in 0..1000 { - months.push(rng.gen_range(1..13)); - } - - Int32Array::from(months) -} - -fn days(rng: &mut ThreadRng) -> Int32Array { - let mut days = vec![]; - for _ in 0..1000 { - days.push(rng.gen_range(1..29)); - } - - Int32Array::from(days) -} -fn criterion_benchmark(c: &mut Criterion) { - c.bench_function("make_date_col_col_col_1000", |b| { - let mut rng = rand::thread_rng(); - let years = ColumnarValue::Array(Arc::new(years(&mut rng)) as ArrayRef); - let months = ColumnarValue::Array(Arc::new(months(&mut rng)) as ArrayRef); - let days = ColumnarValue::Array(Arc::new(days(&mut rng)) as ArrayRef); - - b.iter(|| { - black_box( - make_date(&[years.clone(), months.clone(), days.clone()]) - .expect("make_date should work on valid values"), - ) - }) - }); - - c.bench_function("make_date_scalar_col_col_1000", |b| { - let mut rng = rand::thread_rng(); - let year = ColumnarValue::Scalar(ScalarValue::Int32(Some(2025))); - let months = ColumnarValue::Array(Arc::new(months(&mut rng)) as ArrayRef); - let days = ColumnarValue::Array(Arc::new(days(&mut rng)) as ArrayRef); - - b.iter(|| { - black_box( - make_date(&[year.clone(), months.clone(), days.clone()]) - .expect("make_date should work on valid values"), - ) - }) - }); - - c.bench_function("make_date_scalar_scalar_col_1000", |b| { - let mut rng = rand::thread_rng(); - let year = ColumnarValue::Scalar(ScalarValue::Int32(Some(2025))); - let month = ColumnarValue::Scalar(ScalarValue::Int32(Some(11))); - let days = ColumnarValue::Array(Arc::new(days(&mut rng)) as ArrayRef); - - b.iter(|| { - black_box( - make_date(&[year.clone(), month.clone(), days.clone()]) - .expect("make_date should work on valid values"), - ) - }) - }); - - c.bench_function("make_date_scalar_scalar_scalar", |b| { - let year = ColumnarValue::Scalar(ScalarValue::Int32(Some(2025))); - let month = ColumnarValue::Scalar(ScalarValue::Int32(Some(11))); - let day = ColumnarValue::Scalar(ScalarValue::Int32(Some(26))); - - b.iter(|| { - black_box( - make_date(&[year.clone(), month.clone(), day.clone()]) - .expect("make_date should work on valid values"), - ) - }) - }); -} - -criterion_group!(benches, criterion_benchmark); -criterion_main!(benches); diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index 6a9a03bec0710..a4c1815137824 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -17,19 +17,10 @@ //! DateTime expressions -use std::sync::Arc; - +use arrow::datatypes::DataType; use arrow::datatypes::TimeUnit; -use arrow::util::display::{ArrayFormatter, DurationFormat, FormatOptions}; -use arrow::{ - array::{Array, ArrayRef, PrimitiveArray}, - datatypes::DataType, -}; -use arrow_array::builder::PrimitiveBuilder; -use arrow_array::cast::AsArray; -use arrow_array::types::{Date32Type, Int32Type}; -use arrow_array::StringArray; use chrono::{DateTime, Datelike, NaiveDate, Utc}; + use datafusion_common::{exec_err, Result, ScalarValue}; use datafusion_expr::ColumnarValue; @@ -82,323 +73,6 @@ pub fn make_current_time( move |_arg| Ok(ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(nano))) } -/// Returns a string representation of a date, time, timestamp or duration based -/// on a Chrono pattern. -/// -/// The syntax for the patterns can be found at -/// -/// -/// # Examples -/// -/// ```ignore -/// # use chrono::prelude::*; -/// # use datafusion::prelude::*; -/// # use datafusion::error::Result; -/// # use datafusion_common::ScalarValue::TimestampNanosecond; -/// # use std::sync::Arc; -/// # use arrow_array::{Date32Array, RecordBatch, StringArray}; -/// # use arrow_schema::{DataType, Field, Schema}; -/// # #[tokio::main] -/// # async fn main() -> Result<()> { -/// let schema = Arc::new(Schema::new(vec![ -/// Field::new("values", DataType::Date32, false), -/// Field::new("patterns", DataType::Utf8, false), -/// ])); -/// -/// let batch = RecordBatch::try_new( -/// schema, -/// vec![ -/// Arc::new(Date32Array::from(vec![ -/// 18506, -/// 18507, -/// 18508, -/// 18509, -/// ])), -/// Arc::new(StringArray::from(vec![ -/// "%Y-%m-%d", -/// "%Y:%m:%d", -/// "%Y%m%d", -/// "%d-%m-%Y", -/// ])), -/// ], -/// )?; -/// -/// let ctx = SessionContext::new(); -/// ctx.register_batch("t", batch)?; -/// let df = ctx.table("t").await?; -/// -/// // use the to_char function to convert col 'values', -/// // to strings using patterns in col 'patterns' -/// let df = df.with_column( -/// "date_str", -/// to_char(col("values"), col("patterns")) -/// )?; -/// // Note that providing a scalar value for the pattern -/// // is more performant -/// let df = df.with_column( -/// "date_str2", -/// to_char(col("values"), lit("%d-%m-%Y")) -/// )?; -/// // literals can be used as well with dataframe calls -/// let timestamp = "2026-07-08T09:10:11" -/// .parse::() -/// .unwrap() -/// .with_nanosecond(56789) -/// .unwrap() -/// .timestamp_nanos_opt() -/// .unwrap(); -/// let df = df.with_column( -/// "timestamp_str", -/// to_char(lit(TimestampNanosecond(Some(timestamp), None)), lit("%d-%m-%Y %H:%M:%S")) -/// )?; -/// -/// df.show().await?; -/// -/// # Ok(()) -/// # } -/// ``` -pub fn to_char(args: &[ColumnarValue]) -> Result { - if args.len() != 2 { - return exec_err!("to_char function requires 2 arguments, got {}", args.len()); - } - - match &args[1] { - // null format, use default formats - ColumnarValue::Scalar(ScalarValue::Utf8(None)) - | ColumnarValue::Scalar(ScalarValue::Null) => { - _to_char_scalar(args[0].clone(), None) - } - // constant format - ColumnarValue::Scalar(ScalarValue::Utf8(Some(format))) => { - // invoke to_char_scalar with the known string, without converting to array - _to_char_scalar(args[0].clone(), Some(format)) - } - ColumnarValue::Array(_) => _to_char_array(args), - _ => { - exec_err!( - "Format for `to_char` must be non-null Utf8, received {:?}", - args[1].data_type() - ) - } - } -} - -fn _build_format_options<'a>( - data_type: &DataType, - format: Option<&'a str>, -) -> Result, Result> { - let Some(format) = format else { - return Ok(FormatOptions::new()); - }; - let format_options = match data_type { - DataType::Date32 => FormatOptions::new().with_date_format(Some(format)), - DataType::Date64 => FormatOptions::new().with_datetime_format(Some(format)), - DataType::Time32(_) => FormatOptions::new().with_time_format(Some(format)), - DataType::Time64(_) => FormatOptions::new().with_time_format(Some(format)), - DataType::Timestamp(_, _) => FormatOptions::new() - .with_timestamp_format(Some(format)) - .with_timestamp_tz_format(Some(format)), - DataType::Duration(_) => FormatOptions::new().with_duration_format( - if "ISO8601".eq_ignore_ascii_case(format) { - DurationFormat::ISO8601 - } else { - DurationFormat::Pretty - }, - ), - other => { - return Err(exec_err!( - "to_char only supports date, time, timestamp and duration data types, received {other:?}" - )); - } - }; - Ok(format_options) -} - -/// Special version when arg\[1] is a scalar -fn _to_char_scalar( - expression: ColumnarValue, - format: Option<&str>, -) -> Result { - // it's possible that the expression is a scalar however because - // of the implementation in arrow-rs we need to convert it to an array - let data_type = &expression.data_type(); - let is_scalar_expression = matches!(&expression, ColumnarValue::Scalar(_)); - let array = expression.into_array(1)?; - let format_options = match _build_format_options(data_type, format) { - Ok(value) => value, - Err(value) => return value, - }; - - let formatter = ArrayFormatter::try_new(array.as_ref(), &format_options)?; - let formatted: Result, arrow_schema::ArrowError> = (0..array.len()) - .map(|i| formatter.value(i).try_to_string()) - .collect(); - - if let Ok(formatted) = formatted { - if is_scalar_expression { - Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some( - formatted.first().unwrap().to_string(), - )))) - } else { - Ok(ColumnarValue::Array( - Arc::new(StringArray::from(formatted)) as ArrayRef - )) - } - } else { - exec_err!("{}", formatted.unwrap_err()) - } -} - -fn _to_char_array(args: &[ColumnarValue]) -> Result { - let arrays = ColumnarValue::values_to_arrays(args)?; - let mut results: Vec = vec![]; - let format_array = arrays[1].as_string::(); - let data_type = arrays[0].data_type(); - - for idx in 0..arrays[0].len() { - let format = if format_array.is_null(idx) { - None - } else { - Some(format_array.value(idx)) - }; - let format_options = match _build_format_options(data_type, format) { - Ok(value) => value, - Err(value) => return value, - }; - // this isn't ideal but this can't use ValueFormatter as it isn't independent - // from ArrayFormatter - let formatter = ArrayFormatter::try_new(arrays[0].as_ref(), &format_options)?; - let result = formatter.value(idx).try_to_string(); - match result { - Ok(value) => results.push(value), - Err(e) => return exec_err!("{}", e), - } - } - - match args[0] { - ColumnarValue::Array(_) => Ok(ColumnarValue::Array(Arc::new(StringArray::from( - results, - )) as ArrayRef)), - ColumnarValue::Scalar(_) => Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some( - results.first().unwrap().to_string(), - )))), - } -} - -/// make_date(year, month, day) SQL function implementation -pub fn make_date(args: &[ColumnarValue]) -> Result { - if args.len() != 3 { - return exec_err!( - "make_date function requires 3 arguments, got {}", - args.len() - ); - } - - // first, identify if any of the arguments is an Array. If yes, store its `len`, - // as any scalar will need to be converted to an array of len `len`. - let len = args - .iter() - .fold(Option::::None, |acc, arg| match arg { - ColumnarValue::Scalar(_) => acc, - ColumnarValue::Array(a) => Some(a.len()), - }); - - let is_scalar = len.is_none(); - let array_size = if is_scalar { 1 } else { len.unwrap() }; - - let years = args[0].cast_to(&DataType::Int32, None)?; - let months = args[1].cast_to(&DataType::Int32, None)?; - let days = args[2].cast_to(&DataType::Int32, None)?; - - // since the epoch for the date32 datatype is the unix epoch - // we need to subtract the unix epoch from the current date - // note this can result in a negative value - let unix_days_from_ce = NaiveDate::from_ymd_opt(1970, 1, 1) - .unwrap() - .num_days_from_ce(); - - let mut builder: PrimitiveBuilder = PrimitiveArray::builder(array_size); - - let construct_date_fn = |builder: &mut PrimitiveBuilder, - year: i32, - month: i32, - day: i32, - unix_days_from_ce: i32| - -> Result<()> { - let Ok(m) = u32::try_from(month) else { - return exec_err!("Month value '{month:?}' is out of range"); - }; - let Ok(d) = u32::try_from(day) else { - return exec_err!("Day value '{day:?}' is out of range"); - }; - - let date = NaiveDate::from_ymd_opt(year, m, d); - - match date { - Some(d) => builder.append_value(d.num_days_from_ce() - unix_days_from_ce), - None => return exec_err!("Unable to parse date from {year}, {month}, {day}"), - }; - Ok(()) - }; - - let scalar_value_fn = |col: &ColumnarValue| -> Result { - let ColumnarValue::Scalar(s) = col else { - return exec_err!("Expected scalar value"); - }; - let ScalarValue::Int32(Some(i)) = s else { - return exec_err!("Unable to parse date from null/empty value"); - }; - Ok(*i) - }; - - // For scalar only columns the operation is faster without using the PrimitiveArray - if is_scalar { - construct_date_fn( - &mut builder, - scalar_value_fn(&years)?, - scalar_value_fn(&months)?, - scalar_value_fn(&days)?, - unix_days_from_ce, - )?; - } else { - let to_primitive_array = |col: &ColumnarValue, - scalar_count: usize| - -> Result> { - match col { - ColumnarValue::Array(a) => Ok(a.as_primitive::().to_owned()), - _ => { - let v = scalar_value_fn(col).unwrap(); - Ok(PrimitiveArray::::from_value(v, scalar_count)) - } - } - }; - - let years = to_primitive_array(&years, array_size).unwrap(); - let months = to_primitive_array(&months, array_size).unwrap(); - let days = to_primitive_array(&days, array_size).unwrap(); - for i in 0..array_size { - construct_date_fn( - &mut builder, - years.value(i), - months.value(i), - days.value(i), - unix_days_from_ce, - )?; - } - } - - let arr = builder.finish(); - - if is_scalar { - // If all inputs are scalar, keeps output as scalar - Ok(ColumnarValue::Scalar(ScalarValue::Date32(Some( - arr.value(0), - )))) - } else { - Ok(ColumnarValue::Array(Arc::new(arr))) - } -} - /// from_unixtime() SQL function implementation pub fn from_unixtime_invoke(args: &[ColumnarValue]) -> Result { if args.len() != 1 { @@ -420,454 +94,3 @@ pub fn from_unixtime_invoke(args: &[ColumnarValue]) -> Result { } } } - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use arrow::array::{ArrayRef, Int64Array}; - use arrow_array::{ - Date32Array, Date64Array, Int32Array, Time32MillisecondArray, Time32SecondArray, - Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, - TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, - UInt32Array, - }; - use chrono::{NaiveDateTime, Timelike}; - - use datafusion_common::ScalarValue; - - use super::*; - - #[test] - fn test_make_date() { - let res = make_date(&[ - ColumnarValue::Scalar(ScalarValue::Int32(Some(2024))), - ColumnarValue::Scalar(ScalarValue::Int64(Some(1))), - ColumnarValue::Scalar(ScalarValue::UInt32(Some(14))), - ]) - .expect("that make_date parsed values without error"); - - if let ColumnarValue::Scalar(ScalarValue::Date32(date)) = res { - assert_eq!(19736, date.unwrap()); - } else { - panic!("Expected a scalar value") - } - - let res = make_date(&[ - ColumnarValue::Scalar(ScalarValue::Int64(Some(2024))), - ColumnarValue::Scalar(ScalarValue::UInt64(Some(1))), - ColumnarValue::Scalar(ScalarValue::UInt32(Some(14))), - ]) - .expect("that make_date parsed values without error"); - - if let ColumnarValue::Scalar(ScalarValue::Date32(date)) = res { - assert_eq!(19736, date.unwrap()); - } else { - panic!("Expected a scalar value") - } - - let res = make_date(&[ - ColumnarValue::Scalar(ScalarValue::Utf8(Some("2024".to_string()))), - ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some("1".to_string()))), - ColumnarValue::Scalar(ScalarValue::Utf8(Some("14".to_string()))), - ]) - .expect("that make_date parsed values without error"); - - if let ColumnarValue::Scalar(ScalarValue::Date32(date)) = res { - assert_eq!(19736, date.unwrap()); - } else { - panic!("Expected a scalar value") - } - - let years = Arc::new((2021..2025).map(Some).collect::()); - let months = Arc::new((1..5).map(Some).collect::()); - let days = Arc::new((11..15).map(Some).collect::()); - let res = make_date(&[ - ColumnarValue::Array(years), - ColumnarValue::Array(months), - ColumnarValue::Array(days), - ]) - .expect("that make_date parsed values without error"); - - if let ColumnarValue::Array(array) = res { - assert_eq!(array.len(), 4); - let mut builder = Date32Array::builder(4); - builder.append_value(18_638); - builder.append_value(19_035); - builder.append_value(19_429); - builder.append_value(19_827); - assert_eq!(&builder.finish() as &dyn Array, array.as_ref()); - } else { - panic!("Expected a columnar array") - } - - // - // Fallible test cases - // - - // invalid number of arguments - let res = make_date(&[ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))]); - assert_eq!( - res.err().unwrap().strip_backtrace(), - "Execution error: make_date function requires 3 arguments, got 1" - ); - - // invalid type - let res = make_date(&[ - ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))), - ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), - ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), - ]); - assert_eq!( - res.err().unwrap().strip_backtrace(), - "Arrow error: Cast error: Casting from Interval(YearMonth) to Int32 not supported" - ); - - // overflow of month - let res = make_date(&[ - ColumnarValue::Scalar(ScalarValue::Int32(Some(2023))), - ColumnarValue::Scalar(ScalarValue::UInt64(Some(u64::MAX))), - ColumnarValue::Scalar(ScalarValue::Int32(Some(22))), - ]); - assert_eq!( - res.err().unwrap().strip_backtrace(), - "Arrow error: Cast error: Can't cast value 18446744073709551615 to type Int32" - ); - - // overflow of day - let res = make_date(&[ - ColumnarValue::Scalar(ScalarValue::Int32(Some(2023))), - ColumnarValue::Scalar(ScalarValue::Int32(Some(22))), - ColumnarValue::Scalar(ScalarValue::UInt32(Some(u32::MAX))), - ]); - assert_eq!( - res.err().unwrap().strip_backtrace(), - "Arrow error: Cast error: Can't cast value 4294967295 to type Int32" - ); - } - - #[test] - fn test_to_char() { - let date = "2020-01-02T03:04:05" - .parse::() - .unwrap() - .with_nanosecond(12345) - .unwrap(); - let date2 = "2026-07-08T09:10:11" - .parse::() - .unwrap() - .with_nanosecond(56789) - .unwrap(); - - let scalar_data = vec![ - ( - ScalarValue::Date32(Some(18506)), - ScalarValue::Utf8(Some("%Y::%m::%d".to_string())), - "2020::09::01".to_string(), - ), - ( - ScalarValue::Date64(Some(date.and_utc().timestamp_millis())), - ScalarValue::Utf8(Some("%Y::%m::%d".to_string())), - "2020::01::02".to_string(), - ), - ( - ScalarValue::Time32Second(Some(31851)), - ScalarValue::Utf8(Some("%H-%M-%S".to_string())), - "08-50-51".to_string(), - ), - ( - ScalarValue::Time32Millisecond(Some(18506000)), - ScalarValue::Utf8(Some("%H-%M-%S".to_string())), - "05-08-26".to_string(), - ), - ( - ScalarValue::Time64Microsecond(Some(12344567000)), - ScalarValue::Utf8(Some("%H-%M-%S %f".to_string())), - "03-25-44 567000000".to_string(), - ), - ( - ScalarValue::Time64Nanosecond(Some(12344567890000)), - ScalarValue::Utf8(Some("%H-%M-%S %f".to_string())), - "03-25-44 567890000".to_string(), - ), - ( - ScalarValue::TimestampSecond(Some(date.and_utc().timestamp()), None), - ScalarValue::Utf8(Some("%Y::%m::%d %S::%M::%H".to_string())), - "2020::01::02 05::04::03".to_string(), - ), - ( - ScalarValue::TimestampMillisecond( - Some(date.and_utc().timestamp_millis()), - None, - ), - ScalarValue::Utf8(Some("%Y::%m::%d %S::%M::%H".to_string())), - "2020::01::02 05::04::03".to_string(), - ), - ( - ScalarValue::TimestampMicrosecond( - Some(date.and_utc().timestamp_micros()), - None, - ), - ScalarValue::Utf8(Some("%Y::%m::%d %S::%M::%H %f".to_string())), - "2020::01::02 05::04::03 000012000".to_string(), - ), - ( - ScalarValue::TimestampNanosecond( - Some(date.and_utc().timestamp_nanos_opt().unwrap()), - None, - ), - ScalarValue::Utf8(Some("%Y::%m::%d %S::%M::%H %f".to_string())), - "2020::01::02 05::04::03 000012345".to_string(), - ), - ]; - - for (value, format, expected) in scalar_data { - let result = - to_char(&[ColumnarValue::Scalar(value), ColumnarValue::Scalar(format)]) - .expect("that to_char parsed values without error"); - - if let ColumnarValue::Scalar(ScalarValue::Utf8(date)) = result { - assert_eq!(expected, date.unwrap()); - } else { - panic!("Expected a scalar value") - } - } - - let scalar_array_data = vec![ - ( - ScalarValue::Date32(Some(18506)), - StringArray::from(vec!["%Y::%m::%d".to_string()]), - "2020::09::01".to_string(), - ), - ( - ScalarValue::Date64(Some(date.and_utc().timestamp_millis())), - StringArray::from(vec!["%Y::%m::%d".to_string()]), - "2020::01::02".to_string(), - ), - ( - ScalarValue::Time32Second(Some(31851)), - StringArray::from(vec!["%H-%M-%S".to_string()]), - "08-50-51".to_string(), - ), - ( - ScalarValue::Time32Millisecond(Some(18506000)), - StringArray::from(vec!["%H-%M-%S".to_string()]), - "05-08-26".to_string(), - ), - ( - ScalarValue::Time64Microsecond(Some(12344567000)), - StringArray::from(vec!["%H-%M-%S %f".to_string()]), - "03-25-44 567000000".to_string(), - ), - ( - ScalarValue::Time64Nanosecond(Some(12344567890000)), - StringArray::from(vec!["%H-%M-%S %f".to_string()]), - "03-25-44 567890000".to_string(), - ), - ( - ScalarValue::TimestampSecond(Some(date.and_utc().timestamp()), None), - StringArray::from(vec!["%Y::%m::%d %S::%M::%H".to_string()]), - "2020::01::02 05::04::03".to_string(), - ), - ( - ScalarValue::TimestampMillisecond( - Some(date.and_utc().timestamp_millis()), - None, - ), - StringArray::from(vec!["%Y::%m::%d %S::%M::%H".to_string()]), - "2020::01::02 05::04::03".to_string(), - ), - ( - ScalarValue::TimestampMicrosecond( - Some(date.and_utc().timestamp_micros()), - None, - ), - StringArray::from(vec!["%Y::%m::%d %S::%M::%H %f".to_string()]), - "2020::01::02 05::04::03 000012000".to_string(), - ), - ( - ScalarValue::TimestampNanosecond( - Some(date.and_utc().timestamp_nanos_opt().unwrap()), - None, - ), - StringArray::from(vec!["%Y::%m::%d %S::%M::%H %f".to_string()]), - "2020::01::02 05::04::03 000012345".to_string(), - ), - ]; - - for (value, format, expected) in scalar_array_data { - let result = to_char(&[ - ColumnarValue::Scalar(value), - ColumnarValue::Array(Arc::new(format) as ArrayRef), - ]) - .expect("that to_char parsed values without error"); - - if let ColumnarValue::Scalar(ScalarValue::Utf8(date)) = result { - assert_eq!(expected, date.unwrap()); - } else { - panic!("Expected a scalar value") - } - } - - let array_scalar_data = vec![ - ( - Arc::new(Date32Array::from(vec![18506, 18507])) as ArrayRef, - ScalarValue::Utf8(Some("%Y::%m::%d".to_string())), - StringArray::from(vec!["2020::09::01", "2020::09::02"]), - ), - ( - Arc::new(Date64Array::from(vec![ - date.and_utc().timestamp_millis(), - date2.and_utc().timestamp_millis(), - ])) as ArrayRef, - ScalarValue::Utf8(Some("%Y::%m::%d".to_string())), - StringArray::from(vec!["2020::01::02", "2026::07::08"]), - ), - ]; - - let array_array_data = vec![ - ( - Arc::new(Date32Array::from(vec![18506, 18507])) as ArrayRef, - StringArray::from(vec!["%Y::%m::%d", "%d::%m::%Y"]), - StringArray::from(vec!["2020::09::01", "02::09::2020"]), - ), - ( - Arc::new(Date64Array::from(vec![ - date.and_utc().timestamp_millis(), - date2.and_utc().timestamp_millis(), - ])) as ArrayRef, - StringArray::from(vec!["%Y::%m::%d", "%d::%m::%Y"]), - StringArray::from(vec!["2020::01::02", "08::07::2026"]), - ), - ( - Arc::new(Time32MillisecondArray::from(vec![1850600, 1860700])) - as ArrayRef, - StringArray::from(vec!["%H:%M:%S", "%H::%M::%S"]), - StringArray::from(vec!["00:30:50", "00::31::00"]), - ), - ( - Arc::new(Time32SecondArray::from(vec![18506, 18507])) as ArrayRef, - StringArray::from(vec!["%H:%M:%S", "%H::%M::%S"]), - StringArray::from(vec!["05:08:26", "05::08::27"]), - ), - ( - Arc::new(Time64MicrosecondArray::from(vec![12344567000, 22244567000])) - as ArrayRef, - StringArray::from(vec!["%H:%M:%S", "%H::%M::%S"]), - StringArray::from(vec!["03:25:44", "06::10::44"]), - ), - ( - Arc::new(Time64NanosecondArray::from(vec![ - 1234456789000, - 2224456789000, - ])) as ArrayRef, - StringArray::from(vec!["%H:%M:%S", "%H::%M::%S"]), - StringArray::from(vec!["00:20:34", "00::37::04"]), - ), - ( - Arc::new(TimestampSecondArray::from(vec![ - date.and_utc().timestamp(), - date2.and_utc().timestamp(), - ])) as ArrayRef, - StringArray::from(vec!["%Y::%m::%d %S::%M::%H", "%d::%m::%Y %S-%M-%H"]), - StringArray::from(vec![ - "2020::01::02 05::04::03", - "08::07::2026 11-10-09", - ]), - ), - ( - Arc::new(TimestampMillisecondArray::from(vec![ - date.and_utc().timestamp_millis(), - date2.and_utc().timestamp_millis(), - ])) as ArrayRef, - StringArray::from(vec![ - "%Y::%m::%d %S::%M::%H %f", - "%d::%m::%Y %S-%M-%H %f", - ]), - StringArray::from(vec![ - "2020::01::02 05::04::03 000000000", - "08::07::2026 11-10-09 000000000", - ]), - ), - ( - Arc::new(TimestampMicrosecondArray::from(vec![ - date.and_utc().timestamp_micros(), - date2.and_utc().timestamp_micros(), - ])) as ArrayRef, - StringArray::from(vec![ - "%Y::%m::%d %S::%M::%H %f", - "%d::%m::%Y %S-%M-%H %f", - ]), - StringArray::from(vec![ - "2020::01::02 05::04::03 000012000", - "08::07::2026 11-10-09 000056000", - ]), - ), - ( - Arc::new(TimestampNanosecondArray::from(vec![ - date.and_utc().timestamp_nanos_opt().unwrap(), - date2.and_utc().timestamp_nanos_opt().unwrap(), - ])) as ArrayRef, - StringArray::from(vec![ - "%Y::%m::%d %S::%M::%H %f", - "%d::%m::%Y %S-%M-%H %f", - ]), - StringArray::from(vec![ - "2020::01::02 05::04::03 000012345", - "08::07::2026 11-10-09 000056789", - ]), - ), - ]; - - for (value, format, expected) in array_scalar_data { - let result = to_char(&[ - ColumnarValue::Array(value as ArrayRef), - ColumnarValue::Scalar(format), - ]) - .expect("that to_char parsed values without error"); - - if let ColumnarValue::Array(result) = result { - assert_eq!(result.len(), 2); - assert_eq!(&expected as &dyn Array, result.as_ref()); - } else { - panic!("Expected an array value") - } - } - - for (value, format, expected) in array_array_data { - let result = to_char(&[ - ColumnarValue::Array(value), - ColumnarValue::Array(Arc::new(format) as ArrayRef), - ]) - .expect("that to_char parsed values without error"); - - if let ColumnarValue::Array(result) = result { - assert_eq!(result.len(), 2); - assert_eq!(&expected as &dyn Array, result.as_ref()); - } else { - panic!("Expected an array value") - } - } - - // - // Fallible test cases - // - - // invalid number of arguments - let result = to_char(&[ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))]); - assert_eq!( - result.err().unwrap().strip_backtrace(), - "Execution error: to_char function requires 2 arguments, got 1" - ); - - // invalid type - let result = to_char(&[ - ColumnarValue::Scalar(ScalarValue::Int32(Some(1))), - ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), - ]); - assert_eq!( - result.err().unwrap().strip_backtrace(), - "Execution error: Format for `to_char` must be non-null Utf8, received Timestamp(Nanosecond, None)" - ); - } -} diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index e9ac9bd2d6a24..5057ec4f8d6a1 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -445,8 +445,6 @@ pub fn create_physical_fun( execution_props.query_execution_start_time, )) } - BuiltinScalarFunction::MakeDate => Arc::new(datetime_expressions::make_date), - BuiltinScalarFunction::ToChar => Arc::new(datetime_expressions::to_char), BuiltinScalarFunction::FromUnixtime => { Arc::new(datetime_expressions::from_unixtime_invoke) } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index c2a36af2e72d2..4b55bdd7d0c44 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -680,10 +680,10 @@ enum ScalarFunction { ArrayResize = 130; EndsWith = 131; /// 132 was InStr - MakeDate = 133; + /// 133 was MakeDate ArrayReverse = 134; /// 135 is RegexpLike - ToChar = 136; + /// 136 was ToChar /// 137 was ToDate /// 138 was ToUnixtime } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 0ec6de8f4072b..c6738c794d4a5 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -22198,9 +22198,7 @@ impl serde::Serialize for ScalarFunction { Self::ArrayDistinct => "ArrayDistinct", Self::ArrayResize => "ArrayResize", Self::EndsWith => "EndsWith", - Self::MakeDate => "MakeDate", Self::ArrayReverse => "ArrayReverse", - Self::ToChar => "ToChar", }; serializer.serialize_str(variant) } @@ -22313,9 +22311,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "ArrayDistinct", "ArrayResize", "EndsWith", - "MakeDate", "ArrayReverse", - "ToChar", ]; struct GeneratedVisitor; @@ -22457,9 +22453,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "ArrayDistinct" => Ok(ScalarFunction::ArrayDistinct), "ArrayResize" => Ok(ScalarFunction::ArrayResize), "EndsWith" => Ok(ScalarFunction::EndsWith), - "MakeDate" => Ok(ScalarFunction::MakeDate), "ArrayReverse" => Ok(ScalarFunction::ArrayReverse), - "ToChar" => Ok(ScalarFunction::ToChar), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 9b34b084c95df..0d8bce744bc60 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2752,13 +2752,13 @@ pub enum ScalarFunction { ArrayResize = 130, EndsWith = 131, /// / 132 was InStr - MakeDate = 133, - ArrayReverse = 134, - /// / 135 is RegexpLike + /// / 133 was MakeDate /// + /// / 135 is RegexpLike + /// / 136 was ToChar /// / 137 was ToDate /// / 138 was ToUnixtime - ToChar = 136, + ArrayReverse = 134, } impl ScalarFunction { /// String value of the enum field names used in the ProtoBuf definition. @@ -2868,9 +2868,7 @@ impl ScalarFunction { ScalarFunction::ArrayDistinct => "ArrayDistinct", ScalarFunction::ArrayResize => "ArrayResize", ScalarFunction::EndsWith => "EndsWith", - ScalarFunction::MakeDate => "MakeDate", ScalarFunction::ArrayReverse => "ArrayReverse", - ScalarFunction::ToChar => "ToChar", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -2977,9 +2975,7 @@ impl ScalarFunction { "ArrayDistinct" => Some(Self::ArrayDistinct), "ArrayResize" => Some(Self::ArrayResize), "EndsWith" => Some(Self::EndsWith), - "MakeDate" => Some(Self::MakeDate), "ArrayReverse" => Some(Self::ArrayReverse), - "ToChar" => Some(Self::ToChar), _ => None, } } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 8dba553b48010..f1aafb314aa27 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -526,11 +526,9 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::Strpos => Self::Strpos, ScalarFunction::Substr => Self::Substr, ScalarFunction::ToHex => Self::ToHex, - ScalarFunction::ToChar => Self::ToChar, ScalarFunction::Now => Self::Now, ScalarFunction::CurrentDate => Self::CurrentDate, ScalarFunction::CurrentTime => Self::CurrentTime, - ScalarFunction::MakeDate => Self::MakeDate, ScalarFunction::Uuid => Self::Uuid, ScalarFunction::Translate => Self::Translate, ScalarFunction::Coalesce => Self::Coalesce, @@ -1681,26 +1679,6 @@ pub fn parse_expr( ScalarFunction::ToHex => { Ok(to_hex(parse_expr(&args[0], registry, codec)?)) } - ScalarFunction::MakeDate => { - let args: Vec<_> = args - .iter() - .map(|expr| parse_expr(expr, registry, codec)) - .collect::>()?; - Ok(Expr::ScalarFunction(expr::ScalarFunction::new( - BuiltinScalarFunction::MakeDate, - args, - ))) - } - ScalarFunction::ToChar => { - let args: Vec<_> = args - .iter() - .map(|expr| parse_expr(expr, registry, codec)) - .collect::>()?; - Ok(Expr::ScalarFunction(expr::ScalarFunction::new( - BuiltinScalarFunction::ToChar, - args, - ))) - } ScalarFunction::Now => Ok(now()), ScalarFunction::Translate => Ok(translate( parse_expr(&args[0], registry, codec)?, diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 393cc78267710..5d852dff52957 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -1455,7 +1455,6 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::Trim => Self::Trim, BuiltinScalarFunction::Ltrim => Self::Ltrim, BuiltinScalarFunction::Rtrim => Self::Rtrim, - BuiltinScalarFunction::ToChar => Self::ToChar, BuiltinScalarFunction::ArraySort => Self::ArraySort, BuiltinScalarFunction::ArrayExcept => Self::ArrayExcept, BuiltinScalarFunction::ArrayDistinct => Self::ArrayDistinct, @@ -1510,7 +1509,6 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::Now => Self::Now, BuiltinScalarFunction::CurrentDate => Self::CurrentDate, BuiltinScalarFunction::CurrentTime => Self::CurrentTime, - BuiltinScalarFunction::MakeDate => Self::MakeDate, BuiltinScalarFunction::Translate => Self::Translate, BuiltinScalarFunction::Coalesce => Self::Coalesce, BuiltinScalarFunction::Pi => Self::Pi, From ebe3b2af500bce02dd2ad981905ad00da86d24e9 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Wed, 13 Mar 2024 12:44:58 -0400 Subject: [PATCH 2/4] Update post merge to remove datetime_expressions.rs --- datafusion/expr/src/built_in_function.rs | 2 +- datafusion/functions/src/datetime/mod.rs | 4 +- .../physical-expr/src/datetime_expressions.rs | 96 ------------------- datafusion/physical-expr/src/functions.rs | 4 +- datafusion/physical-expr/src/lib.rs | 1 - docs/source/contributor-guide/index.md | 2 +- 6 files changed, 6 insertions(+), 103 deletions(-) delete mode 100644 datafusion/physical-expr/src/datetime_expressions.rs diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index fc47194198e10..dc8d3d733cb5c 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -25,7 +25,7 @@ use std::sync::{Arc, OnceLock}; use crate::type_coercion::functions::data_types; use crate::{FuncMonotonicity, Signature, TypeSignature, Volatility}; -use arrow::datatypes::{DataType, Field, TimeUnit}; +use arrow::datatypes::{DataType, Field}; use datafusion_common::{plan_err, DataFusionError, Result}; use strum::IntoEnumIterator; diff --git a/datafusion/functions/src/datetime/mod.rs b/datafusion/functions/src/datetime/mod.rs index 0ff2d6e23aab6..a2dfc93b05a39 100644 --- a/datafusion/functions/src/datetime/mod.rs +++ b/datafusion/functions/src/datetime/mod.rs @@ -27,10 +27,10 @@ mod current_time; mod date_bin; mod date_part; mod date_trunc; -mod make_date; -mod to_char; mod from_unixtime; +mod make_date; mod now; +mod to_char; mod to_date; mod to_timestamp; mod to_unixtime; diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs deleted file mode 100644 index a4c1815137824..0000000000000 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ /dev/null @@ -1,96 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! DateTime expressions - -use arrow::datatypes::DataType; -use arrow::datatypes::TimeUnit; -use chrono::{DateTime, Datelike, NaiveDate, Utc}; - -use datafusion_common::{exec_err, Result, ScalarValue}; -use datafusion_expr::ColumnarValue; - -/// Create an implementation of `now()` that always returns the -/// specified timestamp. -/// -/// The semantics of `now()` require it to return the same value -/// wherever it appears within a single statement. This value is -/// chosen during planning time. -pub fn make_now( - now_ts: DateTime, -) -> impl Fn(&[ColumnarValue]) -> Result { - let now_ts = now_ts.timestamp_nanos_opt(); - move |_arg| { - Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( - now_ts, - Some("+00:00".into()), - ))) - } -} - -/// Create an implementation of `current_date()` that always returns the -/// specified current date. -/// -/// The semantics of `current_date()` require it to return the same value -/// wherever it appears within a single statement. This value is -/// chosen during planning time. -pub fn make_current_date( - now_ts: DateTime, -) -> impl Fn(&[ColumnarValue]) -> Result { - let days = Some( - now_ts.num_days_from_ce() - - NaiveDate::from_ymd_opt(1970, 1, 1) - .unwrap() - .num_days_from_ce(), - ); - move |_arg| Ok(ColumnarValue::Scalar(ScalarValue::Date32(days))) -} - -/// Create an implementation of `current_time()` that always returns the -/// specified current time. -/// -/// The semantics of `current_time()` require it to return the same value -/// wherever it appears within a single statement. This value is -/// chosen during planning time. -pub fn make_current_time( - now_ts: DateTime, -) -> impl Fn(&[ColumnarValue]) -> Result { - let nano = now_ts.timestamp_nanos_opt().map(|ts| ts % 86400000000000); - move |_arg| Ok(ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(nano))) -} - -/// from_unixtime() SQL function implementation -pub fn from_unixtime_invoke(args: &[ColumnarValue]) -> Result { - if args.len() != 1 { - return exec_err!( - "from_unixtime function requires 1 argument, got {}", - args.len() - ); - } - - match args[0].data_type() { - DataType::Int64 => { - args[0].cast_to(&DataType::Timestamp(TimeUnit::Second, None), None) - } - other => { - exec_err!( - "Unsupported data type {:?} for function from_unixtime", - other - ) - } - } -} diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index b9d5dd1eb7239..8f26484bca9a2 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -32,8 +32,8 @@ use crate::sort_properties::SortProperties; use crate::{ - array_expressions, conditional_expressions, datetime_expressions, math_expressions, - string_expressions, PhysicalExpr, ScalarFunctionExpr, + array_expressions, conditional_expressions, math_expressions, string_expressions, + PhysicalExpr, ScalarFunctionExpr, }; use arrow::{ array::ArrayRef, diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 07bccf25c86ac..7ae5e05b210a5 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -22,7 +22,6 @@ pub mod binary_map; pub mod conditional_expressions; #[cfg(feature = "crypto_expressions")] pub mod crypto_expressions; -pub mod datetime_expressions; pub mod equivalence; pub mod expressions; pub mod functions; diff --git a/docs/source/contributor-guide/index.md b/docs/source/contributor-guide/index.md index 5545ef52be643..9d3a177be6bd9 100644 --- a/docs/source/contributor-guide/index.md +++ b/docs/source/contributor-guide/index.md @@ -279,7 +279,7 @@ Below is a checklist of what you need to do to add a new aggregate function to D - Add the actual implementation of an `Accumulator` and `AggregateExpr`: - [here](../../../datafusion/physical-expr/src/string_expressions.rs) for string functions - [here](../../../datafusion/physical-expr/src/math_expressions.rs) for math functions - - [here](../../../datafusion/physical-expr/src/datetime_expressions.rs) for datetime functions + - [here](../../../datafusion/functions/src/datetime/mod.rs) for datetime functions - create a new module [here](../../../datafusion/physical-expr/src) for other functions - In [datafusion/expr/src](../../../datafusion/expr/src/aggregate_function.rs), add: - a new variant to `AggregateFunction` From c9b67bd69b0020a403f65acd13e7051fa61368e4 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Thu, 14 Mar 2024 08:52:50 -0400 Subject: [PATCH 3/4] Fix benchmarks. --- datafusion/functions/benches/make_date.rs | 90 ++++++++++++----------- datafusion/functions/benches/to_char.rs | 70 +++++++++++------- 2 files changed, 90 insertions(+), 70 deletions(-) diff --git a/datafusion/functions/benches/make_date.rs b/datafusion/functions/benches/make_date.rs index e64d49d3cf6c4..0e1fabb3f6af4 100644 --- a/datafusion/functions/benches/make_date.rs +++ b/datafusion/functions/benches/make_date.rs @@ -17,93 +17,97 @@ extern crate criterion; -use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use rand::rngs::ThreadRng; +use std::sync::Arc; + +use arrow_array::{ArrayRef, Int32Array}; +use criterion::{black_box, Criterion, criterion_group, criterion_main}; use rand::Rng; +use rand::rngs::ThreadRng; -use datafusion_expr::{lit, Expr}; -use datafusion_functions::expr_fn::make_date; +use datafusion_common::ScalarValue; +use datafusion_expr::ColumnarValue; +use datafusion_functions::datetime::make_date; -fn years(rng: &mut ThreadRng) -> Vec { +fn years(rng: &mut ThreadRng) -> Int32Array { let mut years = vec![]; for _ in 0..1000 { - years.push(lit(rng.gen_range(1900..2050))); + years.push(rng.gen_range(1900..2050)); } - years + Int32Array::from(years) } -fn months(rng: &mut ThreadRng) -> Vec { +fn months(rng: &mut ThreadRng) -> Int32Array { let mut months = vec![]; for _ in 0..1000 { - months.push(lit(rng.gen_range(1..13))); + months.push(rng.gen_range(1..13)); } - months + Int32Array::from(months) } -fn days(rng: &mut ThreadRng) -> Vec { +fn days(rng: &mut ThreadRng) -> Int32Array { let mut days = vec![]; for _ in 0..1000 { - days.push(lit(rng.gen_range(1..29))); + days.push(rng.gen_range(1..29)); } - days + Int32Array::from(days) } fn criterion_benchmark(c: &mut Criterion) { c.bench_function("make_date_col_col_col_1000", |b| { let mut rng = rand::thread_rng(); - let years = years(&mut rng); - let months = months(&mut rng); - let days = days(&mut rng); + let years = ColumnarValue::Array(Arc::new(years(&mut rng)) as ArrayRef); + let months = ColumnarValue::Array(Arc::new(months(&mut rng)) as ArrayRef); + let days = ColumnarValue::Array(Arc::new(days(&mut rng)) as ArrayRef); b.iter(|| { - years.iter().enumerate().for_each(|(idx, i)| { - black_box(make_date( - i.clone(), - months.get(idx).unwrap().clone(), - days.get(idx).unwrap().clone(), - )); - }) + black_box( + make_date().invoke(&[years.clone(), months.clone(), days.clone()]) + .expect("make_date should work on valid values"), + ) }) }); c.bench_function("make_date_scalar_col_col_1000", |b| { let mut rng = rand::thread_rng(); - let year = lit(2025); - let months = months(&mut rng); - let days = days(&mut rng); + let year = ColumnarValue::Scalar(ScalarValue::Int32(Some(2025))); + let months = ColumnarValue::Array(Arc::new(months(&mut rng)) as ArrayRef); + let days = ColumnarValue::Array(Arc::new(days(&mut rng)) as ArrayRef); b.iter(|| { - months.iter().enumerate().for_each(|(idx, i)| { - black_box(make_date( - year.clone(), - i.clone(), - days.get(idx).unwrap().clone(), - )); - }) + black_box( + make_date().invoke(&[year.clone(), months.clone(), days.clone()]) + .expect("make_date should work on valid values"), + ) }) }); c.bench_function("make_date_scalar_scalar_col_1000", |b| { let mut rng = rand::thread_rng(); - let year = lit(2025); - let months = lit(11); - let days = days(&mut rng); + let year = ColumnarValue::Scalar(ScalarValue::Int32(Some(2025))); + let month = ColumnarValue::Scalar(ScalarValue::Int32(Some(11))); + let days = ColumnarValue::Array(Arc::new(days(&mut rng)) as ArrayRef); b.iter(|| { - days.iter().for_each(|i| { - black_box(make_date(year.clone(), months.clone(), i.clone())); - }) + black_box( + make_date().invoke(&[year.clone(), month.clone(), days.clone()]) + .expect("make_date should work on valid values"), + ) }) }); c.bench_function("make_date_scalar_scalar_scalar", |b| { - let year = lit(2025); - let month = lit(11); - let day = lit(26); + let year = ColumnarValue::Scalar(ScalarValue::Int32(Some(2025))); + let month = ColumnarValue::Scalar(ScalarValue::Int32(Some(11))); + let day = ColumnarValue::Scalar(ScalarValue::Int32(Some(26))); - b.iter(|| black_box(make_date(year.clone(), month.clone(), day.clone()))) + b.iter(|| { + black_box( + make_date().invoke(&[year.clone(), month.clone(), day.clone()]) + .expect("make_date should work on valid values"), + ) + }) }); } diff --git a/datafusion/functions/benches/to_char.rs b/datafusion/functions/benches/to_char.rs index ff08537071bb9..645844f63610e 100644 --- a/datafusion/functions/benches/to_char.rs +++ b/datafusion/functions/benches/to_char.rs @@ -17,16 +17,20 @@ extern crate criterion; +use std::sync::Arc; + +use arrow_array::{ArrayRef, Date32Array, StringArray}; use chrono::prelude::*; use chrono::TimeDelta; -use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use criterion::{black_box, Criterion, criterion_group, criterion_main}; +use rand::Rng; use rand::rngs::ThreadRng; use rand::seq::SliceRandom; -use rand::Rng; +use datafusion_common::ScalarValue; use datafusion_common::ScalarValue::TimestampNanosecond; -use datafusion_expr::{lit, Expr}; -use datafusion_functions::expr_fn::to_char; +use datafusion_expr::ColumnarValue; +use datafusion_functions::datetime::to_char; fn random_date_in_range( rng: &mut ThreadRng, @@ -38,8 +42,8 @@ fn random_date_in_range( start_date + TimeDelta::try_days(random_days).unwrap() } -fn data(rng: &mut ThreadRng) -> Vec { - let mut data: Vec = vec![]; +fn data(rng: &mut ThreadRng) -> Date32Array { + let mut data: Vec = vec![]; let unix_days_from_ce = NaiveDate::from_ymd_opt(1970, 1, 1) .unwrap() .num_days_from_ce(); @@ -50,15 +54,16 @@ fn data(rng: &mut ThreadRng) -> Vec { .parse::() .expect("Date should parse"); for _ in 0..1000 { - data.push(lit(random_date_in_range(rng, start_date, end_date) - .num_days_from_ce() - - unix_days_from_ce)); + data.push( + random_date_in_range(rng, start_date, end_date).num_days_from_ce() + - unix_days_from_ce, + ); } - data + Date32Array::from(data) } -fn patterns(rng: &mut ThreadRng) -> Vec { +fn patterns(rng: &mut ThreadRng) -> StringArray { let samples = vec![ "%Y:%m:%d".to_string(), "%d-%m-%Y".to_string(), @@ -66,36 +71,39 @@ fn patterns(rng: &mut ThreadRng) -> Vec { "%Y%m%d".to_string(), "%Y...%m...%d".to_string(), ]; - let mut data: Vec = vec![]; + let mut data: Vec = vec![]; for _ in 0..1000 { - data.push(lit(samples.choose(rng).unwrap().to_string())); + data.push(samples.choose(rng).unwrap().to_string()); } - data + StringArray::from(data) } fn criterion_benchmark(c: &mut Criterion) { c.bench_function("to_char_array_array_1000", |b| { let mut rng = rand::thread_rng(); - let data = data(&mut rng); - let patterns = patterns(&mut rng); + let data = ColumnarValue::Array(Arc::new(data(&mut rng)) as ArrayRef); + let patterns = ColumnarValue::Array(Arc::new(patterns(&mut rng)) as ArrayRef); b.iter(|| { - data.iter().enumerate().for_each(|(idx, i)| { - black_box(to_char(i.clone(), patterns.get(idx).unwrap().clone())); - }) + black_box( + to_char().invoke(&[data.clone(), patterns.clone()]) + .expect("to_char should work on valid values"), + ) }) }); c.bench_function("to_char_array_scalar_1000", |b| { let mut rng = rand::thread_rng(); - let data = data(&mut rng); - let patterns = lit("%Y-%m-%d"); + let data = ColumnarValue::Array(Arc::new(data(&mut rng)) as ArrayRef); + let patterns = + ColumnarValue::Scalar(ScalarValue::Utf8(Some("%Y-%m-%d".to_string()))); b.iter(|| { - data.iter().for_each(|i| { - black_box(to_char(i.clone(), patterns.clone())); - }) + black_box( + to_char().invoke(&[data.clone(), patterns.clone()]) + .expect("to_char should work on valid values"), + ) }) }); @@ -108,12 +116,20 @@ fn criterion_benchmark(c: &mut Criterion) { .and_utc() .timestamp_nanos_opt() .unwrap(); - let data = lit(TimestampNanosecond(Some(timestamp), None)); - let pattern = lit("%d-%m-%Y %H:%M:%S"); + let data = ColumnarValue::Scalar(TimestampNanosecond(Some(timestamp), None)); + let pattern = ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "%d-%m-%Y %H:%M:%S".to_string(), + ))); - b.iter(|| black_box(to_char(data.clone(), pattern.clone()))) + b.iter(|| { + black_box( + to_char().invoke(&[data.clone(), pattern.clone()]) + .expect("to_char should work on valid values"), + ) + }) }); } criterion_group!(benches, criterion_benchmark); criterion_main!(benches); + From 8c87ec6dfee79ceca5e25aacc0b43a63b093246f Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Thu, 14 Mar 2024 09:35:25 -0400 Subject: [PATCH 4/4] Cargo fmt. --- datafusion/functions/benches/make_date.rs | 16 ++++++++++------ datafusion/functions/benches/to_char.rs | 14 ++++++++------ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/datafusion/functions/benches/make_date.rs b/datafusion/functions/benches/make_date.rs index 0e1fabb3f6af4..7c75277b913eb 100644 --- a/datafusion/functions/benches/make_date.rs +++ b/datafusion/functions/benches/make_date.rs @@ -20,9 +20,9 @@ extern crate criterion; use std::sync::Arc; use arrow_array::{ArrayRef, Int32Array}; -use criterion::{black_box, Criterion, criterion_group, criterion_main}; -use rand::Rng; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; use rand::rngs::ThreadRng; +use rand::Rng; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; @@ -63,7 +63,8 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { black_box( - make_date().invoke(&[years.clone(), months.clone(), days.clone()]) + make_date() + .invoke(&[years.clone(), months.clone(), days.clone()]) .expect("make_date should work on valid values"), ) }) @@ -77,7 +78,8 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { black_box( - make_date().invoke(&[year.clone(), months.clone(), days.clone()]) + make_date() + .invoke(&[year.clone(), months.clone(), days.clone()]) .expect("make_date should work on valid values"), ) }) @@ -91,7 +93,8 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { black_box( - make_date().invoke(&[year.clone(), month.clone(), days.clone()]) + make_date() + .invoke(&[year.clone(), month.clone(), days.clone()]) .expect("make_date should work on valid values"), ) }) @@ -104,7 +107,8 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { black_box( - make_date().invoke(&[year.clone(), month.clone(), day.clone()]) + make_date() + .invoke(&[year.clone(), month.clone(), day.clone()]) .expect("make_date should work on valid values"), ) }) diff --git a/datafusion/functions/benches/to_char.rs b/datafusion/functions/benches/to_char.rs index 645844f63610e..cb0374c95bb3c 100644 --- a/datafusion/functions/benches/to_char.rs +++ b/datafusion/functions/benches/to_char.rs @@ -22,10 +22,10 @@ use std::sync::Arc; use arrow_array::{ArrayRef, Date32Array, StringArray}; use chrono::prelude::*; use chrono::TimeDelta; -use criterion::{black_box, Criterion, criterion_group, criterion_main}; -use rand::Rng; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; use rand::rngs::ThreadRng; use rand::seq::SliceRandom; +use rand::Rng; use datafusion_common::ScalarValue; use datafusion_common::ScalarValue::TimestampNanosecond; @@ -87,7 +87,8 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { black_box( - to_char().invoke(&[data.clone(), patterns.clone()]) + to_char() + .invoke(&[data.clone(), patterns.clone()]) .expect("to_char should work on valid values"), ) }) @@ -101,7 +102,8 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { black_box( - to_char().invoke(&[data.clone(), patterns.clone()]) + to_char() + .invoke(&[data.clone(), patterns.clone()]) .expect("to_char should work on valid values"), ) }) @@ -123,7 +125,8 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { black_box( - to_char().invoke(&[data.clone(), pattern.clone()]) + to_char() + .invoke(&[data.clone(), pattern.clone()]) .expect("to_char should work on valid values"), ) }) @@ -132,4 +135,3 @@ fn criterion_benchmark(c: &mut Criterion) { criterion_group!(benches, criterion_benchmark); criterion_main!(benches); -