diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index 30ba7de76a471..7d011abff0976 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -480,6 +480,11 @@ impl SessionConfig { self.options.execution.enforce_batch_size_in_joins } + pub fn with_enable_ansi_mode(mut self, enable_ansi_mode: bool) -> Self { + self.options_mut().execution.enable_ansi_mode = enable_ansi_mode; + self + } + /// Convert configuration options to name-value pairs with values /// converted to strings. /// diff --git a/datafusion/functions/src/math/abs.rs b/datafusion/functions/src/math/abs.rs index 081668f7669f6..1b5aaf7745a84 100644 --- a/datafusion/functions/src/math/abs.rs +++ b/datafusion/functions/src/math/abs.rs @@ -50,6 +50,7 @@ macro_rules! make_abs_function { }}; } +#[macro_export] macro_rules! make_try_abs_function { ($ARRAY_TYPE:ident) => {{ |input: &ArrayRef| { @@ -62,7 +63,8 @@ macro_rules! make_try_abs_function { x )) }) - })?; + }) + .and_then(|v| Ok(v.with_data_type(input.data_type().clone())))?; // maintain decimal's precision and scale Ok(Arc::new(res) as ArrayRef) } }}; diff --git a/datafusion/spark/src/function/math/abs.rs b/datafusion/spark/src/function/math/abs.rs index 101291ac5f66e..44b345f27751a 100644 --- a/datafusion/spark/src/function/math/abs.rs +++ b/datafusion/spark/src/function/math/abs.rs @@ -17,13 +17,15 @@ use arrow::array::*; use arrow::datatypes::{DataType, Field, FieldRef}; +use arrow::error::ArrowError; use datafusion_common::{DataFusionError, Result, ScalarValue, internal_err}; use datafusion_expr::{ ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; use datafusion_functions::{ - downcast_named_arg, make_abs_function, make_wrapping_abs_function, + downcast_named_arg, make_abs_function, make_try_abs_function, + make_wrapping_abs_function, }; use std::any::Any; use std::sync::Arc; @@ -34,8 +36,10 @@ use std::sync::Arc; /// Returns the absolute value of input /// Returns NULL if input is NULL, returns NaN if input is NaN. /// -/// TODOs: +/// Differences with DataFusion abs: /// - Spark's ANSI-compliant dialect, when off (i.e. `spark.sql.ansi.enabled=false`), taking absolute value on the minimal value of a signed integer returns the value as is. DataFusion's abs throws "DataFusion error: Arrow error: Compute error" on arithmetic overflow +/// +/// TODOs: /// - Spark's abs also supports ANSI interval types: YearMonthIntervalType and DayTimeIntervalType. DataFusion's abs doesn't. /// #[derive(Debug, PartialEq, Eq, Hash)] @@ -85,19 +89,39 @@ impl ScalarUDFImpl for SparkAbs { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - spark_abs(&args.args) + spark_abs(&args.args, args.config_options.execution.enable_ansi_mode) } } macro_rules! scalar_compute_op { - ($INPUT:ident, $SCALAR_TYPE:ident) => {{ - let result = $INPUT.wrapping_abs(); + ($ENABLE_ANSI_MODE:expr, $INPUT:ident, $SCALAR_TYPE:ident) => {{ + let result = if $ENABLE_ANSI_MODE { + $INPUT.checked_abs().ok_or_else(|| { + ArrowError::ComputeError(format!( + "{} overflow on abs({:?})", + stringify!($SCALAR_TYPE), + $INPUT + )) + })? + } else { + $INPUT.wrapping_abs() + }; Ok(ColumnarValue::Scalar(ScalarValue::$SCALAR_TYPE(Some( result, )))) }}; - ($INPUT:ident, $PRECISION:expr, $SCALE:expr, $SCALAR_TYPE:ident) => {{ - let result = $INPUT.wrapping_abs(); + ($ENABLE_ANSI_MODE:expr, $INPUT:ident, $PRECISION:expr, $SCALE:expr, $SCALAR_TYPE:ident) => {{ + let result = if $ENABLE_ANSI_MODE { + $INPUT.checked_abs().ok_or_else(|| { + ArrowError::ComputeError(format!( + "{} overflow on abs({:?})", + stringify!($SCALAR_TYPE), + $INPUT + )) + })? + } else { + $INPUT.wrapping_abs() + }; Ok(ColumnarValue::Scalar(ScalarValue::$SCALAR_TYPE( Some(result), $PRECISION, @@ -106,7 +130,10 @@ macro_rules! scalar_compute_op { }}; } -pub fn spark_abs(args: &[ColumnarValue]) -> Result { +pub fn spark_abs( + args: &[ColumnarValue], + enable_ansi_mode: bool, +) -> Result { if args.len() != 1 { return internal_err!("abs takes exactly 1 argument, but got: {}", args.len()); } @@ -119,19 +146,35 @@ pub fn spark_abs(args: &[ColumnarValue]) -> Result Ok(args[0].clone()), DataType::Int8 => { - let abs_fun = make_wrapping_abs_function!(Int8Array); + let abs_fun = if enable_ansi_mode { + make_try_abs_function!(Int8Array) + } else { + make_wrapping_abs_function!(Int8Array) + }; abs_fun(array).map(ColumnarValue::Array) } DataType::Int16 => { - let abs_fun = make_wrapping_abs_function!(Int16Array); + let abs_fun = if enable_ansi_mode { + make_try_abs_function!(Int16Array) + } else { + make_wrapping_abs_function!(Int16Array) + }; abs_fun(array).map(ColumnarValue::Array) } DataType::Int32 => { - let abs_fun = make_wrapping_abs_function!(Int32Array); + let abs_fun = if enable_ansi_mode { + make_try_abs_function!(Int32Array) + } else { + make_wrapping_abs_function!(Int32Array) + }; abs_fun(array).map(ColumnarValue::Array) } DataType::Int64 => { - let abs_fun = make_wrapping_abs_function!(Int64Array); + let abs_fun = if enable_ansi_mode { + make_try_abs_function!(Int64Array) + } else { + make_wrapping_abs_function!(Int64Array) + }; abs_fun(array).map(ColumnarValue::Array) } DataType::Float32 => { @@ -143,11 +186,19 @@ pub fn spark_abs(args: &[ColumnarValue]) -> Result { - let abs_fun = make_wrapping_abs_function!(Decimal128Array); + let abs_fun = if enable_ansi_mode { + make_try_abs_function!(Decimal128Array) + } else { + make_wrapping_abs_function!(Decimal128Array) + }; abs_fun(array).map(ColumnarValue::Array) } DataType::Decimal256(_, _) => { - let abs_fun = make_wrapping_abs_function!(Decimal256Array); + let abs_fun = if enable_ansi_mode { + make_try_abs_function!(Decimal256Array) + } else { + make_wrapping_abs_function!(Decimal256Array) + }; abs_fun(array).map(ColumnarValue::Array) } dt => internal_err!("Not supported datatype for Spark ABS: {dt}"), @@ -159,10 +210,10 @@ pub fn spark_abs(args: &[ColumnarValue]) -> Result Ok(args[0].clone()), sv if sv.is_null() => Ok(args[0].clone()), - ScalarValue::Int8(Some(v)) => scalar_compute_op!(v, Int8), - ScalarValue::Int16(Some(v)) => scalar_compute_op!(v, Int16), - ScalarValue::Int32(Some(v)) => scalar_compute_op!(v, Int32), - ScalarValue::Int64(Some(v)) => scalar_compute_op!(v, Int64), + ScalarValue::Int8(Some(v)) => scalar_compute_op!(enable_ansi_mode, v, Int8), + ScalarValue::Int16(Some(v)) => scalar_compute_op!(enable_ansi_mode, v, Int16), + ScalarValue::Int32(Some(v)) => scalar_compute_op!(enable_ansi_mode, v, Int32), + ScalarValue::Int64(Some(v)) => scalar_compute_op!(enable_ansi_mode, v, Int64), ScalarValue::Float32(Some(v)) => { Ok(ColumnarValue::Scalar(ScalarValue::Float32(Some(v.abs())))) } @@ -170,10 +221,10 @@ pub fn spark_abs(args: &[ColumnarValue]) -> Result { - scalar_compute_op!(v, *precision, *scale, Decimal128) + scalar_compute_op!(enable_ansi_mode, v, *precision, *scale, Decimal128) } ScalarValue::Decimal256(Some(v), precision, scale) => { - scalar_compute_op!(v, *precision, *scale, Decimal256) + scalar_compute_op!(enable_ansi_mode, v, *precision, *scale, Decimal256) } dt => internal_err!("Not supported datatype for Spark ABS: {dt}"), }, @@ -185,45 +236,51 @@ mod tests { use super::*; use arrow::datatypes::i256; - macro_rules! eval_legacy_mode { - ($TYPE:ident, $VAL:expr) => {{ + macro_rules! eval_with_mode { + ($FLAG:expr, $TYPE:ident, $VAL:expr $(, $RESULT:expr)?) => {{ + eval_with_mode!(@impl $FLAG, $TYPE, $VAL $(, $RESULT)?); + }}; + (@impl $FLAG:expr, $TYPE:ident, $VAL:expr) => {{ let args = ColumnarValue::Scalar(ScalarValue::$TYPE(Some($VAL))); - match spark_abs(&[args]) { - Ok(ColumnarValue::Scalar(ScalarValue::$TYPE(Some(result)))) => { - assert_eq!(result, $VAL); + match spark_abs(&[args], $FLAG) { + Err(e) => { + assert!( + e.to_string().contains("overflow on abs"), + "Error message did not match. Actual message: {e}" + ); } _ => unreachable!(), } }}; - ($TYPE:ident, $VAL:expr, $RESULT:expr) => {{ + (@impl $FLAG:expr, $TYPE:ident, $VAL:expr, $RESULT:expr) => {{ let args = ColumnarValue::Scalar(ScalarValue::$TYPE(Some($VAL))); - match spark_abs(&[args]) { + match spark_abs(&[args], $FLAG) { Ok(ColumnarValue::Scalar(ScalarValue::$TYPE(Some(result)))) => { assert_eq!(result, $RESULT); } _ => unreachable!(), } }}; - ($TYPE:ident, $VAL:expr, $PRECISION:expr, $SCALE:expr) => {{ + ($FLAG:expr, $TYPE:ident, $VAL:expr, $PRECISION:expr, $SCALE:expr $(, $RESULT:expr)?) => {{ + eval_with_mode!(@impl $FLAG, $TYPE, $VAL, $PRECISION, $SCALE $(, $RESULT)?) + }}; + (@impl $FLAG:expr, $TYPE:ident, $VAL:expr, $PRECISION:expr, $SCALE:expr) => {{ let args = ColumnarValue::Scalar(ScalarValue::$TYPE(Some($VAL), $PRECISION, $SCALE)); - match spark_abs(&[args]) { - Ok(ColumnarValue::Scalar(ScalarValue::$TYPE( - Some(result), - precision, - scale, - ))) => { - assert_eq!(result, $VAL); - assert_eq!(precision, $PRECISION); - assert_eq!(scale, $SCALE); + match spark_abs(&[args], $FLAG) { + Err(e) => { + assert!( + e.to_string().contains("overflow on abs"), + "Error message did not match. Actual message: {e}" + ); } _ => unreachable!(), } }}; - ($TYPE:ident, $VAL:expr, $PRECISION:expr, $SCALE:expr, $RESULT:expr) => {{ + (@impl $FLAG:expr, $TYPE:ident, $VAL:expr, $PRECISION:expr, $SCALE:expr, $RESULT:expr) => {{ let args = ColumnarValue::Scalar(ScalarValue::$TYPE(Some($VAL), $PRECISION, $SCALE)); - match spark_abs(&[args]) { + match spark_abs(&[args], $FLAG) { Ok(ColumnarValue::Scalar(ScalarValue::$TYPE( Some(result), precision, @@ -235,24 +292,41 @@ mod tests { } _ => unreachable!(), } + }} + } + + macro_rules! eval_legacy_mode { + ($TYPE:ident, $VAL:expr $(, $RESULT:expr)?) => {{ + eval_with_mode!(false, $TYPE, $VAL $(, $RESULT)?) + }}; + ($TYPE:ident, $VAL:expr, $PRECISION:expr, $SCALE:expr $(, $RESULT:expr)?) => {{ + eval_with_mode!(false, $TYPE, $VAL, $PRECISION, $SCALE $(, $RESULT)?) + }}; + } + macro_rules! eval_ansi_mode { + ($TYPE:ident, $VAL:expr $(, $RESULT:expr)?) => {{ + eval_with_mode!(true, $TYPE, $VAL $(, $RESULT)?) + }}; + ($TYPE:ident, $VAL:expr, $PRECISION:expr, $SCALE:expr $(, $RESULT:expr)?) => {{ + eval_with_mode!(true, $TYPE, $VAL, $PRECISION, $SCALE $(, $RESULT)?) }}; } #[test] fn test_abs_scalar_legacy_mode() { // NumericType MIN - eval_legacy_mode!(UInt8, u8::MIN); - eval_legacy_mode!(UInt16, u16::MIN); - eval_legacy_mode!(UInt32, u32::MIN); - eval_legacy_mode!(UInt64, u64::MIN); - eval_legacy_mode!(Int8, i8::MIN); - eval_legacy_mode!(Int16, i16::MIN); - eval_legacy_mode!(Int32, i32::MIN); - eval_legacy_mode!(Int64, i64::MIN); + eval_legacy_mode!(UInt8, u8::MIN, u8::MIN); + eval_legacy_mode!(UInt16, u16::MIN, u16::MIN); + eval_legacy_mode!(UInt32, u32::MIN, u32::MIN); + eval_legacy_mode!(UInt64, u64::MIN, u64::MIN); + eval_legacy_mode!(Int8, i8::MIN, i8::MIN); + eval_legacy_mode!(Int16, i16::MIN, i16::MIN); + eval_legacy_mode!(Int32, i32::MIN, i32::MIN); + eval_legacy_mode!(Int64, i64::MIN, i64::MIN); eval_legacy_mode!(Float32, f32::MIN, f32::MAX); eval_legacy_mode!(Float64, f64::MIN, f64::MAX); - eval_legacy_mode!(Decimal128, i128::MIN, 18, 10); - eval_legacy_mode!(Decimal256, i256::MIN, 10, 2); + eval_legacy_mode!(Decimal128, i128::MIN, 18, 10, i128::MIN); + eval_legacy_mode!(Decimal256, i256::MIN, 10, 2, i256::MIN); // NumericType not MIN eval_legacy_mode!(Int8, -1i8, 1i8); @@ -273,12 +347,49 @@ mod tests { eval_legacy_mode!(Float64, -0.0f64, 0.0f64); } + #[test] + fn test_abs_scalar_ansi_mode() { + eval_ansi_mode!(Int8, i8::MIN); + eval_ansi_mode!(Int16, i16::MIN); + eval_ansi_mode!(Int32, i32::MIN); + eval_ansi_mode!(Int64, i64::MIN); + eval_ansi_mode!(Decimal128, i128::MIN, 18, 10); + eval_ansi_mode!(Decimal256, i256::MIN, 10, 2); + + eval_ansi_mode!(UInt8, u8::MIN, u8::MIN); + eval_ansi_mode!(UInt16, u16::MIN, u16::MIN); + eval_ansi_mode!(UInt32, u32::MIN, u32::MIN); + eval_ansi_mode!(UInt64, u64::MIN, u64::MIN); + eval_ansi_mode!(Float32, f32::MIN, f32::MAX); + eval_ansi_mode!(Float64, f64::MIN, f64::MAX); + + // NumericType, not MIN + eval_ansi_mode!(Int8, -1i8, 1i8); + eval_ansi_mode!(Int16, -1i16, 1i16); + eval_ansi_mode!(Int32, -1i32, 1i32); + eval_ansi_mode!(Int64, -1i64, 1i64); + eval_ansi_mode!(Decimal128, -1i128, 18, 10, 1i128); + eval_ansi_mode!(Decimal128, i128::MIN + 1, 38, 3, i128::MAX); + eval_ansi_mode!(Decimal256, i256::from(-1i8), 10, 2, i256::from(1i8)); + eval_ansi_mode!(Decimal256, i256::MIN + i256::from(1), 76, 7, i256::MAX); + + // Float32, Float64 + eval_ansi_mode!(Float32, f32::NEG_INFINITY, f32::INFINITY); + eval_ansi_mode!(Float32, f32::INFINITY, f32::INFINITY); + eval_ansi_mode!(Float32, 0.0f32, 0.0f32); + eval_ansi_mode!(Float32, -0.0f32, 0.0f32); + eval_ansi_mode!(Float64, f64::NEG_INFINITY, f64::INFINITY); + eval_ansi_mode!(Float64, f64::INFINITY, f64::INFINITY); + eval_ansi_mode!(Float64, 0.0f64, 0.0f64); + eval_ansi_mode!(Float64, -0.0f64, 0.0f64); + } + macro_rules! eval_array_legacy_mode { ($INPUT:expr, $OUTPUT:expr, $FUNC:ident) => {{ let input = $INPUT; let args = ColumnarValue::Array(Arc::new(input)); let expected = $OUTPUT; - match spark_abs(&[args]) { + match spark_abs(&[args], false) { Ok(ColumnarValue::Array(result)) => { let actual = datafusion_common::cast::$FUNC(&result).unwrap(); assert_eq!(actual, &expected); @@ -367,24 +478,187 @@ mod tests { ); eval_array_legacy_mode!( - Decimal128Array::from(vec![Some(i128::MIN), None]) + Decimal128Array::from(vec![Some(i128::MIN), Some(i128::MIN + 1), None]) .with_precision_and_scale(38, 37) .unwrap(), - Decimal128Array::from(vec![Some(i128::MIN), None]) + Decimal128Array::from(vec![Some(i128::MIN), Some(i128::MAX), None]) .with_precision_and_scale(38, 37) .unwrap(), as_decimal128_array ); eval_array_legacy_mode!( - Decimal256Array::from(vec![Some(i256::MIN), None]) - .with_precision_and_scale(5, 2) + Decimal256Array::from(vec![ + Some(i256::MIN), + Some(i256::MINUS_ONE), + Some(i256::MIN + i256::from(1)), + None + ]) + .with_precision_and_scale(5, 2) + .unwrap(), + Decimal256Array::from(vec![ + Some(i256::MIN), + Some(i256::ONE), + Some(i256::MAX), + None + ]) + .with_precision_and_scale(5, 2) + .unwrap(), + as_decimal256_array + ); + } + + macro_rules! eval_array_ansi_mode { + ($INPUT:expr) => {{ + let input = $INPUT; + let args = ColumnarValue::Array(Arc::new(input)); + match spark_abs(&[args], true) { + Err(e) => { + assert!( + e.to_string().contains("overflow on abs"), + "Error message did not match. Actual message: {e}" + ); + } + _ => unreachable!(), + } + }}; + ($INPUT:expr, $OUTPUT:expr, $FUNC:ident) => {{ + let input = $INPUT; + let args = ColumnarValue::Array(Arc::new(input)); + let expected = $OUTPUT; + match spark_abs(&[args], true) { + Ok(ColumnarValue::Array(result)) => { + let actual = datafusion_common::cast::$FUNC(&result).unwrap(); + assert_eq!(actual, &expected); + } + _ => unreachable!(), + } + }}; + } + #[test] + fn test_abs_array_ansi_mode() { + eval_array_ansi_mode!( + UInt64Array::from(vec![Some(u64::MIN), Some(u64::MAX), None]), + UInt64Array::from(vec![Some(u64::MIN), Some(u64::MAX), None]), + as_uint64_array + ); + + eval_array_ansi_mode!(Int8Array::from(vec![ + Some(-1), + Some(i8::MIN), + Some(i8::MAX), + None + ])); + eval_array_ansi_mode!(Int16Array::from(vec![ + Some(-1), + Some(i16::MIN), + Some(i16::MAX), + None + ])); + eval_array_ansi_mode!(Int32Array::from(vec![ + Some(-1), + Some(i32::MIN), + Some(i32::MAX), + None + ])); + eval_array_ansi_mode!(Int64Array::from(vec![ + Some(-1), + Some(i64::MIN), + Some(i64::MAX), + None + ])); + eval_array_ansi_mode!( + Float32Array::from(vec![ + Some(-1f32), + Some(f32::MIN), + Some(f32::MAX), + None, + Some(f32::NAN), + Some(f32::INFINITY), + Some(f32::NEG_INFINITY), + Some(0.0), + Some(-0.0), + ]), + Float32Array::from(vec![ + Some(1f32), + Some(f32::MAX), + Some(f32::MAX), + None, + Some(f32::NAN), + Some(f32::INFINITY), + Some(f32::INFINITY), + Some(0.0), + Some(0.0), + ]), + as_float32_array + ); + + eval_array_ansi_mode!( + Float64Array::from(vec![ + Some(-1f64), + Some(f64::MIN), + Some(f64::MAX), + None, + Some(f64::NAN), + Some(f64::INFINITY), + Some(f64::NEG_INFINITY), + Some(0.0), + Some(-0.0), + ]), + Float64Array::from(vec![ + Some(1f64), + Some(f64::MAX), + Some(f64::MAX), + None, + Some(f64::NAN), + Some(f64::INFINITY), + Some(f64::INFINITY), + Some(0.0), + Some(0.0), + ]), + as_float64_array + ); + + // decimal: no arithmetic overflow + eval_array_ansi_mode!( + Decimal128Array::from(vec![Some(-1), Some(-2), Some(i128::MIN + 1)]) + .with_precision_and_scale(38, 37) .unwrap(), - Decimal256Array::from(vec![Some(i256::MIN), None]) - .with_precision_and_scale(5, 2) + Decimal128Array::from(vec![Some(1), Some(2), Some(i128::MAX)]) + .with_precision_and_scale(38, 37) .unwrap(), + as_decimal128_array + ); + + eval_array_ansi_mode!( + Decimal256Array::from(vec![ + Some(i256::MINUS_ONE), + Some(i256::from(-2)), + Some(i256::MIN + i256::from(1)) + ]) + .with_precision_and_scale(18, 7) + .unwrap(), + Decimal256Array::from(vec![ + Some(i256::ONE), + Some(i256::from(2)), + Some(i256::MAX) + ]) + .with_precision_and_scale(18, 7) + .unwrap(), as_decimal256_array ); + + // decimal: arithmetic overflow + eval_array_ansi_mode!( + Decimal128Array::from(vec![Some(i128::MIN), None]) + .with_precision_and_scale(38, 37) + .unwrap() + ); + eval_array_ansi_mode!( + Decimal256Array::from(vec![Some(i256::MIN), None]) + .with_precision_and_scale(5, 2) + .unwrap() + ); } #[test] diff --git a/datafusion/sqllogictest/test_files/spark/math/abs.slt b/datafusion/sqllogictest/test_files/spark/math/abs.slt index 19ca902ea3de6..b2025d39c89ff 100644 --- a/datafusion/sqllogictest/test_files/spark/math/abs.slt +++ b/datafusion/sqllogictest/test_files/spark/math/abs.slt @@ -38,6 +38,25 @@ select abs((-128)::TINYINT), abs((-32768)::SMALLINT), abs((-2147483648)::INT), a ---- -128 -32768 -2147483648 -9223372036854775808 +# abs: Spark ANSI mode +statement ok +set datafusion.execution.enable_ansi_mode = true; + +query error DataFusion error: Arrow error: Compute error: Int8 overflow on abs\(\-128\) +select abs((-128)::TINYINT) + +query error DataFusion error: Arrow error: Compute error: Int16 overflow on abs\(\-32768\) +select abs((-32768)::SMALLINT) + +query error DataFusion error: Arrow error: Compute error: Int32 overflow on abs\(\-2147483648\) +select abs((-2147483648)::INT) + +query error DataFusion error: Arrow error: Compute error: Int64 overflow on abs\(\-9223372036854775808\) +select abs((-9223372036854775808)::BIGINT) + +statement ok +set datafusion.execution.enable_ansi_mode = false; + # abs: floats, NULL, NaN, -0, infinity, -infinity query RRRRRRRRRRRR SELECT abs(-1.0::FLOAT), abs(0.::FLOAT), abs(-0.::FLOAT), abs(-0::FLOAT), abs(NULL::FLOAT), abs('NaN'::FLOAT), abs('inf'::FLOAT), abs('+inf'::FLOAT), abs('-inf'::FLOAT), abs('infinity'::FLOAT), abs('+infinity'::FLOAT), abs('-infinity'::FLOAT)