From 0e041ca49a89e4ec8aac7386c6ab3de589366495 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Tue, 5 Aug 2025 17:17:04 -0700 Subject: [PATCH 01/14] init_commit --- native/core/src/execution/planner.rs | 32 ++++++++- native/spark-expr/src/comet_scalar_funcs.rs | 5 ++ .../spark-expr/src/math_funcs/checked_add.rs | 65 +++++++++++++++++++ native/spark-expr/src/math_funcs/mod.rs | 1 + .../org/apache/comet/serde/arithmetic.scala | 20 ------ .../apache/comet/CometExpressionSuite.scala | 4 +- 6 files changed, 102 insertions(+), 25 deletions(-) create mode 100644 native/spark-expr/src/math_funcs/checked_add.rs diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 7fa7bfe905..3df7de9796 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -238,7 +238,7 @@ impl PhysicalPlanner { ) -> Result, ExecutionError> { match spark_expr.expr_struct.as_ref().unwrap() { ExprStruct::Add(expr) => { - // TODO respect eval mode + // TODO respect ANSI eval mode // https://github.com/apache/datafusion-comet/issues/2021 // https://github.com/apache/datafusion-comet/issues/536 let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; @@ -248,6 +248,7 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Plus, input_schema, + _eval_mode != EvalMode::Try, ) } ExprStruct::Subtract(expr) => { @@ -261,6 +262,7 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Minus, input_schema, + true, ) } ExprStruct::Multiply(expr) => { @@ -274,6 +276,7 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Multiply, input_schema, + true, ) } ExprStruct::Divide(expr) => { @@ -287,6 +290,7 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Divide, input_schema, + true, ) } ExprStruct::IntegralDivide(expr) => { @@ -303,6 +307,7 @@ impl PhysicalPlanner { BinaryExprOptions { is_integral_div: true, }, + false, ) } ExprStruct::Remainder(expr) => { @@ -1004,6 +1009,7 @@ impl PhysicalPlanner { return_type: Option<&spark_expression::DataType>, op: DataFusionOperator, input_schema: SchemaRef, + fail_on_overflow: bool, ) -> Result, ExecutionError> { self.create_binary_expr_with_options( left, @@ -1012,6 +1018,7 @@ impl PhysicalPlanner { op, input_schema, BinaryExprOptions::default(), + fail_on_overflow, ) } @@ -1023,6 +1030,7 @@ impl PhysicalPlanner { op: DataFusionOperator, input_schema: SchemaRef, options: BinaryExprOptions, + fail_on_overflow: bool, ) -> Result, ExecutionError> { let left = self.create_expr(left, Arc::clone(&input_schema))?; let right = self.create_expr(right, Arc::clone(&input_schema))?; @@ -1057,7 +1065,7 @@ impl PhysicalPlanner { Ok(Arc::new(Cast::new( child, data_type, - SparkCastOptions::new_without_timezone(EvalMode::Legacy, false), + SparkCastOptions::new_without_timezone(EvalMode::Try, false), ))) } ( @@ -1087,7 +1095,25 @@ impl PhysicalPlanner { Arc::new(Field::new(func_name, data_type, true)), ))) } - _ => Ok(Arc::new(BinaryExpr::new(left, op, right))), + _ => { + if !fail_on_overflow && op == DataFusionOperator::Plus { + let data_type = return_type.map(to_arrow_datatype).unwrap(); + let fun_expr = create_comet_physical_fun( + "checked_add", + data_type.clone(), + &self.session_ctx.state(), + None, + )?; + Ok(Arc::new(ScalarFunctionExpr::new( + "checked_add", + fun_expr, + vec![left, right], + Arc::new(Field::new("checked_add", data_type, true)), + ))) + } else { + Ok(Arc::new(BinaryExpr::new(left, op, right))) + } + } } } diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 961309b788..fec6a54e6d 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -16,6 +16,8 @@ // under the License. use crate::hash_funcs::*; +use crate::math_funcs::checked_add; +use crate::math_funcs::checked_add::checked_add; use crate::math_funcs::modulo_expr::spark_modulo; use crate::{ spark_array_repeat, spark_ceil, spark_date_add, spark_date_sub, spark_decimal_div, @@ -115,6 +117,9 @@ pub fn create_comet_physical_fun( data_type ) } + "checked_add" => { + make_comet_scalar_udf!("checked_add", checked_add, data_type) + } "murmur3_hash" => { let func = Arc::new(spark_murmur3_hash); make_comet_scalar_udf!("murmur3_hash", func, without data_type) diff --git a/native/spark-expr/src/math_funcs/checked_add.rs b/native/spark-expr/src/math_funcs/checked_add.rs new file mode 100644 index 0000000000..03379ca7fc --- /dev/null +++ b/native/spark-expr/src/math_funcs/checked_add.rs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, Int32Array, PrimitiveArray}; +use arrow::array::{ArrayRef, AsArray}; +use arrow::datatypes::{DataType, Int32Type}; +use datafusion::common::DataFusionError; +use datafusion::physical_plan::ColumnarValue; +use std::sync::Arc; + +pub fn checked_add( + args: &[ColumnarValue], + data_type: &DataType, +) -> Result { + checked_add_internal(args, data_type) +} + +fn checked_add_internal( + args: &[ColumnarValue], + data_type: &DataType, +) -> Result { + let left = &args[0]; + let right = &args[1]; + + let (left, right): (ArrayRef, ArrayRef) = match (left, right) { + (ColumnarValue::Array(l), ColumnarValue::Array(r)) => (Arc::clone(l), Arc::clone(r)), + (ColumnarValue::Scalar(l), ColumnarValue::Array(r)) => { + (l.to_array_of_size(r.len())?, Arc::clone(r)) + } + (ColumnarValue::Array(l), ColumnarValue::Scalar(r)) => { + (Arc::clone(l), r.to_array_of_size(l.len())?) + } + (ColumnarValue::Scalar(l), ColumnarValue::Scalar(r)) => (l.to_array()?, r.to_array()?), + }; + + let left = left.as_primitive::(); + let right = right.as_primitive::(); + let mut builder = Int32Array::builder(left.len()); + + for i in 0..left.len() { + if left.is_null(i) || right.is_null(i) { + builder.append_null(); + } else { + let l = left.value(i); + let r = right.value(i); + builder.append_option(l.checked_add(r)); + } + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) +} diff --git a/native/spark-expr/src/math_funcs/mod.rs b/native/spark-expr/src/math_funcs/mod.rs index e7e203de59..0fd26cd387 100644 --- a/native/spark-expr/src/math_funcs/mod.rs +++ b/native/spark-expr/src/math_funcs/mod.rs @@ -16,6 +16,7 @@ // under the License. mod ceil; +pub(crate) mod checked_add; mod div; mod floor; pub(crate) mod hex; diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index 35acf0f356..c5e2558f32 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -94,10 +94,6 @@ object CometAdd extends CometExpressionSerde[Add] with MathBase { withInfo(expr, s"Unsupported datatype ${expr.left.dataType}") return None } - if (expr.evalMode == EvalMode.TRY) { - withInfo(expr, s"Eval mode ${expr.evalMode} is not supported") - return None - } createMathExpression( expr, expr.left, @@ -119,10 +115,6 @@ object CometSubtract extends CometExpressionSerde[Subtract] with MathBase { withInfo(expr, s"Unsupported datatype ${expr.left.dataType}") return None } - if (expr.evalMode == EvalMode.TRY) { - withInfo(expr, s"Eval mode ${expr.evalMode} is not supported") - return None - } createMathExpression( expr, expr.left, @@ -144,10 +136,6 @@ object CometMultiply extends CometExpressionSerde[Multiply] with MathBase { withInfo(expr, s"Unsupported datatype ${expr.left.dataType}") return None } - if (expr.evalMode == EvalMode.TRY) { - withInfo(expr, s"Eval mode ${expr.evalMode} is not supported") - return None - } createMathExpression( expr, expr.left, @@ -174,10 +162,6 @@ object CometDivide extends CometExpressionSerde[Divide] with MathBase { withInfo(expr, s"Unsupported datatype ${expr.left.dataType}") return None } - if (expr.evalMode == EvalMode.TRY) { - withInfo(expr, s"Eval mode ${expr.evalMode} is not supported") - return None - } createMathExpression( expr, expr.left, @@ -199,10 +183,6 @@ object CometIntegralDivide extends CometExpressionSerde[IntegralDivide] with Mat withInfo(expr, s"Unsupported datatype ${expr.left.dataType}") return None } - if (expr.evalMode == EvalMode.TRY) { - withInfo(expr, s"Eval mode ${expr.evalMode} is not supported") - return None - } // Precision is set to 19 (max precision for a numerical data type except DecimalType) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 60435cee7b..bdb78c4fe9 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -317,9 +317,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("try_add") { // TODO: we need to implement more comprehensive tests for all try_ arithmetic functions // https://github.com/apache/datafusion-comet/issues/2021 - val data = Seq((Integer.MAX_VALUE, 1)) + val data = Seq((Integer.MAX_VALUE, 1), (1, 1)) withParquetTable(data, "tbl") { - checkSparkAnswer("SELECT try_add(_1, _2) FROM tbl") + checkSparkAnswerAndOperator("SELECT try_add(_1, _2) FROM tbl") } } From 3f7e0e8d59dcc61d47e30b7daca2fe97f650ea9c Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Thu, 7 Aug 2025 00:38:56 -0700 Subject: [PATCH 02/14] try_arithmetic_impl --- native/core/src/execution/planner.rs | 30 ++-- native/spark-expr/src/comet_scalar_funcs.rs | 20 ++- .../spark-expr/src/math_funcs/checked_add.rs | 65 ------- .../src/math_funcs/checked_arithmetic.rs | 162 ++++++++++++++++++ native/spark-expr/src/math_funcs/mod.rs | 2 +- .../org/apache/comet/serde/arithmetic.scala | 1 - .../apache/comet/CometExpressionSuite.scala | 52 +++++- 7 files changed, 248 insertions(+), 84 deletions(-) delete mode 100644 native/spark-expr/src/math_funcs/checked_add.rs create mode 100644 native/spark-expr/src/math_funcs/checked_arithmetic.rs diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 3df7de9796..ff8daf08cd 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -262,12 +262,10 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Minus, input_schema, - true, + _eval_mode != EvalMode::Try, ) } ExprStruct::Multiply(expr) => { - // TODO respect eval mode - // https://github.com/apache/datafusion-comet/issues/2021 // https://github.com/apache/datafusion-comet/issues/534 let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( @@ -276,12 +274,10 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Multiply, input_schema, - true, + _eval_mode != EvalMode::Try, ) } ExprStruct::Divide(expr) => { - // TODO respect eval mode - // https://github.com/apache/datafusion-comet/issues/2021 // https://github.com/apache/datafusion-comet/issues/533 let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( @@ -290,7 +286,7 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Divide, input_schema, - true, + _eval_mode != EvalMode::Try, ) } ExprStruct::IntegralDivide(expr) => { @@ -1096,19 +1092,29 @@ impl PhysicalPlanner { ))) } _ => { - if !fail_on_overflow && op == DataFusionOperator::Plus { - let data_type = return_type.map(to_arrow_datatype).unwrap(); + let data_type = return_type.map(to_arrow_datatype).unwrap(); + print!("data type parsed : {}", data_type); + if !fail_on_overflow && data_type.is_integer() { + let op_str = match op { + DataFusionOperator::Plus => "checked_add", + DataFusionOperator::Minus => "checked_sub", + DataFusionOperator::Multiply => "checked_mul", + DataFusionOperator::Divide => "checked_div", + _ => { + todo!("Operator with fail_on_overflow is yet to be implemented!"); + } + }; let fun_expr = create_comet_physical_fun( - "checked_add", + &op_str, data_type.clone(), &self.session_ctx.state(), None, )?; Ok(Arc::new(ScalarFunctionExpr::new( - "checked_add", + &op_str, fun_expr, vec![left, right], - Arc::new(Field::new("checked_add", data_type, true)), + Arc::new(Field::new(op_str, data_type, true)), ))) } else { Ok(Arc::new(BinaryExpr::new(left, op, right))) diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index fec6a54e6d..eb0d51d92d 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -16,8 +16,10 @@ // under the License. use crate::hash_funcs::*; -use crate::math_funcs::checked_add; -use crate::math_funcs::checked_add::checked_add; +use crate::math_funcs::checked_arithmetic; +use crate::math_funcs::checked_arithmetic::{ + checked_add, checked_div, checked_mod, checked_mul, checked_sub, +}; use crate::math_funcs::modulo_expr::spark_modulo; use crate::{ spark_array_repeat, spark_ceil, spark_date_add, spark_date_sub, spark_decimal_div, @@ -43,7 +45,7 @@ macro_rules! make_comet_scalar_udf { $name.to_string(), Signature::variadic_any(Volatility::Immutable), $data_type.clone(), - Arc::new(move |args| $func(args, &$data_type)), + Arc::new(move |args| $func(args, &$data_type /* &str */)), ); Ok(Arc::new(ScalarUDF::new_from_impl(scalar_func))) }}; @@ -120,6 +122,18 @@ pub fn create_comet_physical_fun( "checked_add" => { make_comet_scalar_udf!("checked_add", checked_add, data_type) } + "checked_sub" => { + make_comet_scalar_udf!("checked_sub", checked_sub, data_type) + } + "checked_mul" => { + make_comet_scalar_udf!("checked_mul", checked_mul, data_type) + } + "checked_div" => { + make_comet_scalar_udf!("checked_div", checked_div, data_type) + } + "checked_mod" => { + make_comet_scalar_udf!("checked_mod", checked_mod, data_type) + } "murmur3_hash" => { let func = Arc::new(spark_murmur3_hash); make_comet_scalar_udf!("murmur3_hash", func, without data_type) diff --git a/native/spark-expr/src/math_funcs/checked_add.rs b/native/spark-expr/src/math_funcs/checked_add.rs deleted file mode 100644 index 03379ca7fc..0000000000 --- a/native/spark-expr/src/math_funcs/checked_add.rs +++ /dev/null @@ -1,65 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::array::{Array, Int32Array, PrimitiveArray}; -use arrow::array::{ArrayRef, AsArray}; -use arrow::datatypes::{DataType, Int32Type}; -use datafusion::common::DataFusionError; -use datafusion::physical_plan::ColumnarValue; -use std::sync::Arc; - -pub fn checked_add( - args: &[ColumnarValue], - data_type: &DataType, -) -> Result { - checked_add_internal(args, data_type) -} - -fn checked_add_internal( - args: &[ColumnarValue], - data_type: &DataType, -) -> Result { - let left = &args[0]; - let right = &args[1]; - - let (left, right): (ArrayRef, ArrayRef) = match (left, right) { - (ColumnarValue::Array(l), ColumnarValue::Array(r)) => (Arc::clone(l), Arc::clone(r)), - (ColumnarValue::Scalar(l), ColumnarValue::Array(r)) => { - (l.to_array_of_size(r.len())?, Arc::clone(r)) - } - (ColumnarValue::Array(l), ColumnarValue::Scalar(r)) => { - (Arc::clone(l), r.to_array_of_size(l.len())?) - } - (ColumnarValue::Scalar(l), ColumnarValue::Scalar(r)) => (l.to_array()?, r.to_array()?), - }; - - let left = left.as_primitive::(); - let right = right.as_primitive::(); - let mut builder = Int32Array::builder(left.len()); - - for i in 0..left.len() { - if left.is_null(i) || right.is_null(i) { - builder.append_null(); - } else { - let l = left.value(i); - let r = right.value(i); - builder.append_option(l.checked_add(r)); - } - } - - Ok(ColumnarValue::Array(Arc::new(builder.finish()))) -} diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs new file mode 100644 index 0000000000..f883595592 --- /dev/null +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + Array, ArrowNativeTypeOp, Int32Array, Int32Builder, PrimitiveArray, PrimitiveBuilder, +}; +use arrow::array::{ArrayRef, AsArray}; + +use arrow::compute::try_binary; +use arrow::datatypes::{ArrowPrimitiveType, DataType, Int32Type, Int64Type}; +use arrow::error::ArrowError; +use datafusion::common::{DataFusionError, ExprSchema}; +use datafusion::physical_plan::ColumnarValue; +use num::traits::CheckedRem; +use num::{CheckedAdd, CheckedDiv, CheckedMul, CheckedSub}; +use std::sync::Arc; + +fn checked_arithmetic_impl(left_arr: &ArrayRef, right_arr: &ArrayRef, op_kind: &str) -> ArrayRef +where + T: ArrowPrimitiveType, + T::Native: CheckedAdd + CheckedSub + CheckedMul + CheckedDiv + CheckedRem + Copy, +{ + let left = left_arr.as_primitive::(); + let right = right_arr.as_primitive::(); + let mut builder = PrimitiveBuilder::::with_capacity(left.len()); + + for i in 0..left.len() { + if left.is_null(i) || right.is_null(i) { + builder.append_null(); + } else { + let l = left.value(i); + let r = right.value(i); + let val = match op_kind { + "checked_add" => l.checked_add(&r), + "checked_sub" => l.checked_sub(&r), + "checked_mul" => l.checked_mul(&r), + _ => todo!("Unsupported operation: {}", op_kind), + }; + builder.append_option(val); + } + } + Arc::new(builder.finish()) as ArrayRef +} + +pub fn try_arithmetic_kernel( + left: &PrimitiveArray, + right: &PrimitiveArray, + op: &str, +) -> Result +where + T: ArrowPrimitiveType, +{ + let len = left.len(); + let mut builder = PrimitiveBuilder::::with_capacity(len); + + for i in 0..len { + if left.is_null(i) || right.is_null(i) { + builder.append_null(); + } else { + let l = left.value(i); + let r = right.value(i); + + match op { + "checked_add" => builder.append_option(l.add_checked(r).ok()), + "checked_sub" => builder.append_option(l.sub_checked(r).ok()), + "checked_mul" => builder.append_option(l.mul_checked(r).ok()), + _ => todo!("Unsupported operation: {}", op), + } + } + } + + Ok(Arc::new(builder.finish()) as ArrayRef) +} + +pub fn checked_add( + args: &[ColumnarValue], + data_type: &DataType, +) -> Result { + checked_arithmetic_internal(args, data_type, "checked_add") +} + +pub fn checked_sub( + args: &[ColumnarValue], + data_type: &DataType, +) -> Result { + checked_arithmetic_internal(args, data_type, "checked_sub") +} + +pub fn checked_mul( + args: &[ColumnarValue], + data_type: &DataType, +) -> Result { + checked_arithmetic_internal(args, data_type, "checked_mul") +} + +pub fn checked_div( + args: &[ColumnarValue], + data_type: &DataType, +) -> Result { + checked_arithmetic_internal(args, data_type, "checked_div") +} + +pub fn checked_mod( + args: &[ColumnarValue], + data_type: &DataType, +) -> Result { + checked_arithmetic_internal(args, data_type, "checked_mod") +} + +fn checked_arithmetic_internal( + args: &[ColumnarValue], + data_type: &DataType, + op: &str, +) -> Result { + let left = &args[0]; + let right = &args[1]; + + let (left_arr, right_arr): (ArrayRef, ArrayRef) = match (left, right) { + (ColumnarValue::Array(l), ColumnarValue::Array(r)) => (Arc::clone(l), Arc::clone(r)), + (ColumnarValue::Scalar(l), ColumnarValue::Array(r)) => { + (l.to_array_of_size(r.len())?, Arc::clone(r)) + } + (ColumnarValue::Array(l), ColumnarValue::Scalar(r)) => { + (Arc::clone(l), r.to_array_of_size(l.len())?) + } + (ColumnarValue::Scalar(l), ColumnarValue::Scalar(r)) => (l.to_array()?, r.to_array()?), + }; + + // Rust only supports checked_arithmetic on Int32 and Int64 + let result_array = match data_type { + DataType::Int32 => try_arithmetic_kernel::( + left_arr.as_primitive::(), + right_arr.as_primitive::(), + op, + ), + DataType::Int64 => try_arithmetic_kernel::( + left_arr.as_primitive::(), + right_arr.as_primitive::(), + op, + ), + _ => Err(DataFusionError::Internal(format!( + "Unsupported data type: {:?}", + data_type + ))), + }; + + Ok(ColumnarValue::Array(result_array?)) +} diff --git a/native/spark-expr/src/math_funcs/mod.rs b/native/spark-expr/src/math_funcs/mod.rs index 0fd26cd387..873b290ebd 100644 --- a/native/spark-expr/src/math_funcs/mod.rs +++ b/native/spark-expr/src/math_funcs/mod.rs @@ -16,7 +16,7 @@ // under the License. mod ceil; -pub(crate) mod checked_add; +pub(crate) mod checked_arithmetic; mod div; mod floor; pub(crate) mod hex; diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index c5e2558f32..63160d6ef7 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -157,7 +157,6 @@ object CometDivide extends CometExpressionSerde[Divide] with MathBase { // See https://github.com/apache/arrow-datafusion/pull/6792 // For now, use NullIf to swap zeros with nulls. val rightExpr = nullIfWhenPrimitive(expr.right) - if (!supportedDataType(expr.left.dataType)) { withInfo(expr, s"Unsupported datatype ${expr.left.dataType}") return None diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index bdb78c4fe9..fe82531d3f 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -315,12 +315,60 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("try_add") { - // TODO: we need to implement more comprehensive tests for all try_ arithmetic functions - // https://github.com/apache/datafusion-comet/issues/2021 val data = Seq((Integer.MAX_VALUE, 1), (1, 1)) withParquetTable(data, "tbl") { checkSparkAnswerAndOperator("SELECT try_add(_1, _2) FROM tbl") } + withTable("t1") { + val value = Long.MaxValue + sql("create table t1(c1 long, c2 long) using parquet") + sql(s"insert into t1 values($value, 1)") + val res = sql("select try_add(c1, c2) from t1 order by c1") + res.show() + checkSparkAnswerAndOperator(res) + } + } + + test("try_subtract") { + val data = Seq((Integer.MAX_VALUE, 1), (1, 1)) + withParquetTable(data, "tbl") { + checkSparkAnswerAndOperator("SELECT try_subtract(_1, _2) FROM tbl") + } + withTable("t1") { + val value = Long.MinValue + sql("create table t1(c1 long, c2 long) using parquet") + sql(s"insert into t1 values($value, 1)") + val res = sql("select try_subtract(c1, c2) from t1 order by c1") + checkSparkAnswerAndOperator(res) + } + } + + test("try_multiply") { + val data = Seq((Integer.MAX_VALUE, 2), (1, 1)) + withParquetTable(data, "tbl") { + checkSparkAnswerAndOperator("SELECT try_multiply(_1, _2) FROM tbl") + } + withTable("t1") { + val value = Long.MinValue + sql("create table t1(c1 long, c2 long) using parquet") + sql(s"insert into t1 values($value, 2)") + val res = sql("select try_multiply(c1, c2) from t1 order by c1") + checkSparkAnswerAndOperator(res) + } + } + + test("try_divide") { + val data = Seq((Integer.MIN_VALUE, -1), (15121991, 0)) + withParquetTable(data, "tbl") { + checkSparkAnswerAndOperator("SELECT try_divide(_1, _2) FROM tbl") + } + withTable("t1") { + val value = Long.MinValue + sql("create table t1(c1 long, c2 long) using parquet") + sql(s"insert into t1 values($value, -1)") + val res = sql("select try_divide(c1, c2) from t1 order by c1") + checkSparkAnswerAndOperator(res) + } } test("dictionary arithmetic") { From 4c8f5d67adfe0873e6acffc993d76911af8aba6d Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Thu, 7 Aug 2025 09:14:51 -0700 Subject: [PATCH 03/14] try_arithmetic_impl --- .../src/math_funcs/checked_arithmetic.rs | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs index f883595592..000d194eba 100644 --- a/native/spark-expr/src/math_funcs/checked_arithmetic.rs +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -29,33 +29,6 @@ use num::traits::CheckedRem; use num::{CheckedAdd, CheckedDiv, CheckedMul, CheckedSub}; use std::sync::Arc; -fn checked_arithmetic_impl(left_arr: &ArrayRef, right_arr: &ArrayRef, op_kind: &str) -> ArrayRef -where - T: ArrowPrimitiveType, - T::Native: CheckedAdd + CheckedSub + CheckedMul + CheckedDiv + CheckedRem + Copy, -{ - let left = left_arr.as_primitive::(); - let right = right_arr.as_primitive::(); - let mut builder = PrimitiveBuilder::::with_capacity(left.len()); - - for i in 0..left.len() { - if left.is_null(i) || right.is_null(i) { - builder.append_null(); - } else { - let l = left.value(i); - let r = right.value(i); - let val = match op_kind { - "checked_add" => l.checked_add(&r), - "checked_sub" => l.checked_sub(&r), - "checked_mul" => l.checked_mul(&r), - _ => todo!("Unsupported operation: {}", op_kind), - }; - builder.append_option(val); - } - } - Arc::new(builder.finish()) as ArrayRef -} - pub fn try_arithmetic_kernel( left: &PrimitiveArray, right: &PrimitiveArray, From b7b94f0b0b9a31d3e2fe8f53c1b73eb032546165 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Thu, 7 Aug 2025 09:20:32 -0700 Subject: [PATCH 04/14] try_arithmetic_impl --- native/spark-expr/src/math_funcs/checked_arithmetic.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs index 000d194eba..8a85bf790a 100644 --- a/native/spark-expr/src/math_funcs/checked_arithmetic.rs +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -16,17 +16,13 @@ // under the License. use arrow::array::{ - Array, ArrowNativeTypeOp, Int32Array, Int32Builder, PrimitiveArray, PrimitiveBuilder, + Array, ArrowNativeTypeOp, PrimitiveArray, PrimitiveBuilder, }; use arrow::array::{ArrayRef, AsArray}; -use arrow::compute::try_binary; use arrow::datatypes::{ArrowPrimitiveType, DataType, Int32Type, Int64Type}; -use arrow::error::ArrowError; -use datafusion::common::{DataFusionError, ExprSchema}; +use datafusion::common::DataFusionError; use datafusion::physical_plan::ColumnarValue; -use num::traits::CheckedRem; -use num::{CheckedAdd, CheckedDiv, CheckedMul, CheckedSub}; use std::sync::Arc; pub fn try_arithmetic_kernel( From ce72d79612cbfa87d8a10baadab0d5428a58be36 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Thu, 7 Aug 2025 10:15:55 -0700 Subject: [PATCH 05/14] try_arithmetic_impl --- native/core/src/execution/planner.rs | 3 +-- native/spark-expr/src/comet_scalar_funcs.rs | 2 +- native/spark-expr/src/math_funcs/checked_arithmetic.rs | 4 +--- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index ff8daf08cd..852cd6d515 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1061,7 +1061,7 @@ impl PhysicalPlanner { Ok(Arc::new(Cast::new( child, data_type, - SparkCastOptions::new_without_timezone(EvalMode::Try, false), + SparkCastOptions::new_without_timezone(EvalMode::Legacy, false), ))) } ( @@ -1093,7 +1093,6 @@ impl PhysicalPlanner { } _ => { let data_type = return_type.map(to_arrow_datatype).unwrap(); - print!("data type parsed : {}", data_type); if !fail_on_overflow && data_type.is_integer() { let op_str = match op { DataFusionOperator::Plus => "checked_add", diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index eb0d51d92d..7a04e60066 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -45,7 +45,7 @@ macro_rules! make_comet_scalar_udf { $name.to_string(), Signature::variadic_any(Volatility::Immutable), $data_type.clone(), - Arc::new(move |args| $func(args, &$data_type /* &str */)), + Arc::new(move |args| $func(args, &$data_type)), ); Ok(Arc::new(ScalarUDF::new_from_impl(scalar_func))) }}; diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs index 8a85bf790a..67ddae8ee0 100644 --- a/native/spark-expr/src/math_funcs/checked_arithmetic.rs +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{ - Array, ArrowNativeTypeOp, PrimitiveArray, PrimitiveBuilder, -}; +use arrow::array::{Array, ArrowNativeTypeOp, PrimitiveArray, PrimitiveBuilder}; use arrow::array::{ArrayRef, AsArray}; use arrow::datatypes::{ArrowPrimitiveType, DataType, Int32Type, Int64Type}; From 956d6e93fd0b8d407afb9effabb763c0aae58c07 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Thu, 7 Aug 2025 14:35:48 -0700 Subject: [PATCH 06/14] try_arithmetic_impl --- native/core/src/execution/planner.rs | 2 +- native/spark-expr/src/comet_scalar_funcs.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 852cd6d515..a242c90ac0 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -286,7 +286,7 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Divide, input_schema, - _eval_mode != EvalMode::Try, + false, ) } ExprStruct::IntegralDivide(expr) => { diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 7a04e60066..ec353f3bde 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -16,7 +16,6 @@ // under the License. use crate::hash_funcs::*; -use crate::math_funcs::checked_arithmetic; use crate::math_funcs::checked_arithmetic::{ checked_add, checked_div, checked_mod, checked_mul, checked_sub, }; From cf3d30cc181f710403e237484385e4990e158e2e Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Thu, 7 Aug 2025 20:40:34 -0700 Subject: [PATCH 07/14] try_arithmetic_impl --- native/core/src/execution/planner.rs | 33 ++++---- native/spark-expr/src/comet_scalar_funcs.rs | 7 +- .../src/math_funcs/checked_arithmetic.rs | 56 ++++++++----- .../apache/comet/CometExpressionSuite.scala | 78 ++++++++++--------- 4 files changed, 97 insertions(+), 77 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index a242c90ac0..dc9c09b98f 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -239,54 +239,54 @@ impl PhysicalPlanner { match spark_expr.expr_struct.as_ref().unwrap() { ExprStruct::Add(expr) => { // TODO respect ANSI eval mode - // https://github.com/apache/datafusion-comet/issues/2021 // https://github.com/apache/datafusion-comet/issues/536 - let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), expr.right.as_ref().unwrap(), expr.return_type.as_ref(), DataFusionOperator::Plus, input_schema, - _eval_mode != EvalMode::Try, + eval_mode != EvalMode::Try, ) } ExprStruct::Subtract(expr) => { - // TODO respect eval mode - // https://github.com/apache/datafusion-comet/issues/2021 + // TODO respect ANSI eval mode // https://github.com/apache/datafusion-comet/issues/535 - let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), expr.right.as_ref().unwrap(), expr.return_type.as_ref(), DataFusionOperator::Minus, input_schema, - _eval_mode != EvalMode::Try, + eval_mode != EvalMode::Try, ) } ExprStruct::Multiply(expr) => { + // TODO respect ANSI eval mode // https://github.com/apache/datafusion-comet/issues/534 - let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), expr.right.as_ref().unwrap(), expr.return_type.as_ref(), DataFusionOperator::Multiply, input_schema, - _eval_mode != EvalMode::Try, + eval_mode != EvalMode::Try, ) } ExprStruct::Divide(expr) => { + // TODO respect ANSI eval mode // https://github.com/apache/datafusion-comet/issues/533 - let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), expr.right.as_ref().unwrap(), expr.return_type.as_ref(), DataFusionOperator::Divide, input_schema, - false, + eval_mode != EvalMode::Try, ) } ExprStruct::IntegralDivide(expr) => { @@ -1005,7 +1005,7 @@ impl PhysicalPlanner { return_type: Option<&spark_expression::DataType>, op: DataFusionOperator, input_schema: SchemaRef, - fail_on_overflow: bool, + fail_on_overflow: bool ) -> Result, ExecutionError> { self.create_binary_expr_with_options( left, @@ -1014,10 +1014,11 @@ impl PhysicalPlanner { op, input_schema, BinaryExprOptions::default(), - fail_on_overflow, + fail_on_overflow ) } + #[allow(clippy::too_many_arguments)] fn create_binary_expr_with_options( &self, left: &Expr, @@ -1026,7 +1027,7 @@ impl PhysicalPlanner { op: DataFusionOperator, input_schema: SchemaRef, options: BinaryExprOptions, - fail_on_overflow: bool, + fail_on_overflow: bool ) -> Result, ExecutionError> { let left = self.create_expr(left, Arc::clone(&input_schema))?; let right = self.create_expr(right, Arc::clone(&input_schema))?; @@ -1104,13 +1105,13 @@ impl PhysicalPlanner { } }; let fun_expr = create_comet_physical_fun( - &op_str, + op_str, data_type.clone(), &self.session_ctx.state(), None, )?; Ok(Arc::new(ScalarFunctionExpr::new( - &op_str, + op_str, fun_expr, vec![left, right], Arc::new(Field::new(op_str, data_type, true)), diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index ec353f3bde..75f5689ad5 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -16,9 +16,7 @@ // under the License. use crate::hash_funcs::*; -use crate::math_funcs::checked_arithmetic::{ - checked_add, checked_div, checked_mod, checked_mul, checked_sub, -}; +use crate::math_funcs::checked_arithmetic::{checked_add, checked_div, checked_mul, checked_sub}; use crate::math_funcs::modulo_expr::spark_modulo; use crate::{ spark_array_repeat, spark_ceil, spark_date_add, spark_date_sub, spark_decimal_div, @@ -130,9 +128,6 @@ pub fn create_comet_physical_fun( "checked_div" => { make_comet_scalar_udf!("checked_div", checked_div, data_type) } - "checked_mod" => { - make_comet_scalar_udf!("checked_mod", checked_mod, data_type) - } "murmur3_hash" => { let func = Arc::new(spark_murmur3_hash); make_comet_scalar_udf!("murmur3_hash", func, without data_type) diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs index 67ddae8ee0..30e6df209d 100644 --- a/native/spark-expr/src/math_funcs/checked_arithmetic.rs +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -33,21 +33,44 @@ where { let len = left.len(); let mut builder = PrimitiveBuilder::::with_capacity(len); - - for i in 0..len { - if left.is_null(i) || right.is_null(i) { - builder.append_null(); - } else { - let l = left.value(i); - let r = right.value(i); - - match op { - "checked_add" => builder.append_option(l.add_checked(r).ok()), - "checked_sub" => builder.append_option(l.sub_checked(r).ok()), - "checked_mul" => builder.append_option(l.mul_checked(r).ok()), - _ => todo!("Unsupported operation: {}", op), + match op { + "checked_add" => { + for i in 0..len { + if left.is_null(i) || right.is_null(i) { + builder.append_null(); + } else { + builder.append_option(left.value(i).add_checked(right.value(i)).ok()); + } + } + } + "checked_sub" => { + for i in 0..len { + if left.is_null(i) || right.is_null(i) { + builder.append_null(); + } else { + builder.append_option(left.value(i).sub_checked(right.value(i)).ok()); + } } } + "checked_mul" => { + for i in 0..len { + if left.is_null(i) || right.is_null(i) { + builder.append_null(); + } else { + builder.append_option(left.value(i).mul_checked(right.value(i)).ok()); + } + } + } + "checked_div" => { + for i in 0..len { + if left.is_null(i) || right.is_null(i) { + builder.append_null(); + } else { + builder.append_option(left.value(i).div_checked(right.value(i)).ok()); + } + } + } + _ => todo!("Unsupported operation: {}", op), } Ok(Arc::new(builder.finish()) as ArrayRef) @@ -81,13 +104,6 @@ pub fn checked_div( checked_arithmetic_internal(args, data_type, "checked_div") } -pub fn checked_mod( - args: &[ColumnarValue], - data_type: &DataType, -) -> Result { - checked_arithmetic_internal(args, data_type, "checked_mod") -} - fn checked_arithmetic_internal( args: &[ColumnarValue], data_type: &DataType, diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index fe82531d3f..e25a0b2dea 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -315,45 +315,50 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("try_add") { - val data = Seq((Integer.MAX_VALUE, 1), (1, 1)) + val data = Seq((1, 1)) withParquetTable(data, "tbl") { - checkSparkAnswerAndOperator("SELECT try_add(_1, _2) FROM tbl") - } - withTable("t1") { - val value = Long.MaxValue - sql("create table t1(c1 long, c2 long) using parquet") - sql(s"insert into t1 values($value, 1)") - val res = sql("select try_add(c1, c2) from t1 order by c1") - res.show() - checkSparkAnswerAndOperator(res) + checkSparkAnswerAndOperator(spark.sql(""" + |SELECT + | try_add(2147483647, 1), + | try_add(-2147483648, -1), + | try_add(NULL, 5), + | try_add(5, NULL), + | try_add(9223372036854775807, 1), + | try_add(-9223372036854775808, -1) + | from tbl + | """.stripMargin)) } } test("try_subtract") { - val data = Seq((Integer.MAX_VALUE, 1), (1, 1)) + val data = Seq((1, 1)) withParquetTable(data, "tbl") { - checkSparkAnswerAndOperator("SELECT try_subtract(_1, _2) FROM tbl") - } - withTable("t1") { - val value = Long.MinValue - sql("create table t1(c1 long, c2 long) using parquet") - sql(s"insert into t1 values($value, 1)") - val res = sql("select try_subtract(c1, c2) from t1 order by c1") - checkSparkAnswerAndOperator(res) + checkSparkAnswerAndOperator(spark.sql(""" + |SELECT + | try_subtract(2147483647, -1), + | try_subtract(-2147483648, 1), + | try_subtract(NULL, 5), + | try_subtract(5, NULL), + | try_subtract(9223372036854775807, -1), + | try_subtract(-9223372036854775808, 1) + | FROM tbl + """.stripMargin)) } } test("try_multiply") { - val data = Seq((Integer.MAX_VALUE, 2), (1, 1)) + val data = Seq((1, 1)) withParquetTable(data, "tbl") { - checkSparkAnswerAndOperator("SELECT try_multiply(_1, _2) FROM tbl") - } - withTable("t1") { - val value = Long.MinValue - sql("create table t1(c1 long, c2 long) using parquet") - sql(s"insert into t1 values($value, 2)") - val res = sql("select try_multiply(c1, c2) from t1 order by c1") - checkSparkAnswerAndOperator(res) + checkSparkAnswerAndOperator(spark.sql(""" + |SELECT + | try_multiply(1073741824, 4), + | try_multiply(-1073741824, 4), + | try_multiply(NULL, 5), + | try_multiply(5, NULL), + | try_multiply(3037000499, 3037000500), + | try_multiply(-3037000499, 3037000500) + |FROM tbl + """.stripMargin)) } } @@ -361,13 +366,16 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { val data = Seq((Integer.MIN_VALUE, -1), (15121991, 0)) withParquetTable(data, "tbl") { checkSparkAnswerAndOperator("SELECT try_divide(_1, _2) FROM tbl") - } - withTable("t1") { - val value = Long.MinValue - sql("create table t1(c1 long, c2 long) using parquet") - sql(s"insert into t1 values($value, -1)") - val res = sql("select try_divide(c1, c2) from t1 order by c1") - checkSparkAnswerAndOperator(res) + checkSparkAnswerAndOperator(""" + |SELECT + | try_divide(10, 0) + | try_divide(NULL, 5) + | try_divide(5, NULL) + | try_divide(-2147483648, -1) + | try_divide(-9223372036854775808, -1) + | try_divide(DECIMAL('9999999999999999999999999999'), 0.1) + | from tbl + |""".stripMargin) } } From f6908a1ac647bbaa87a575054b5d9aa2e7358997 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Thu, 7 Aug 2025 23:26:43 -0700 Subject: [PATCH 08/14] try_arithmetic_impl --- native/core/src/execution/planner.rs | 23 +++++++++---------- .../apache/comet/CometExpressionSuite.scala | 12 +++++----- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index dc9c09b98f..053b67a25b 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -247,7 +247,7 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Plus, input_schema, - eval_mode != EvalMode::Try, + eval_mode, ) } ExprStruct::Subtract(expr) => { @@ -260,7 +260,7 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Minus, input_schema, - eval_mode != EvalMode::Try, + eval_mode, ) } ExprStruct::Multiply(expr) => { @@ -273,7 +273,7 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Multiply, input_schema, - eval_mode != EvalMode::Try, + eval_mode, ) } ExprStruct::Divide(expr) => { @@ -286,14 +286,13 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Divide, input_schema, - eval_mode != EvalMode::Try, + eval_mode, ) } ExprStruct::IntegralDivide(expr) => { // TODO respect eval mode - // https://github.com/apache/datafusion-comet/issues/2021 // https://github.com/apache/datafusion-comet/issues/533 - let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr_with_options( expr.left.as_ref().unwrap(), expr.right.as_ref().unwrap(), @@ -303,7 +302,7 @@ impl PhysicalPlanner { BinaryExprOptions { is_integral_div: true, }, - false, + eval_mode, ) } ExprStruct::Remainder(expr) => { @@ -1005,7 +1004,7 @@ impl PhysicalPlanner { return_type: Option<&spark_expression::DataType>, op: DataFusionOperator, input_schema: SchemaRef, - fail_on_overflow: bool + eval_mode : EvalMode ) -> Result, ExecutionError> { self.create_binary_expr_with_options( left, @@ -1014,7 +1013,7 @@ impl PhysicalPlanner { op, input_schema, BinaryExprOptions::default(), - fail_on_overflow + eval_mode ) } @@ -1027,7 +1026,7 @@ impl PhysicalPlanner { op: DataFusionOperator, input_schema: SchemaRef, options: BinaryExprOptions, - fail_on_overflow: bool + eval_mode : EvalMode ) -> Result, ExecutionError> { let left = self.create_expr(left, Arc::clone(&input_schema))?; let right = self.create_expr(right, Arc::clone(&input_schema))?; @@ -1094,14 +1093,14 @@ impl PhysicalPlanner { } _ => { let data_type = return_type.map(to_arrow_datatype).unwrap(); - if !fail_on_overflow && data_type.is_integer() { + if eval_mode == EvalMode::Try && data_type.is_integer() { let op_str = match op { DataFusionOperator::Plus => "checked_add", DataFusionOperator::Minus => "checked_sub", DataFusionOperator::Multiply => "checked_mul", DataFusionOperator::Divide => "checked_div", _ => { - todo!("Operator with fail_on_overflow is yet to be implemented!"); + todo!("Operator yet to be implemented!"); } }; let fun_expr = create_comet_physical_fun( diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index e25a0b2dea..93651945f8 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -363,16 +363,16 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("try_divide") { - val data = Seq((Integer.MIN_VALUE, -1), (15121991, 0)) + val data = Seq((15121991, 0)) withParquetTable(data, "tbl") { checkSparkAnswerAndOperator("SELECT try_divide(_1, _2) FROM tbl") checkSparkAnswerAndOperator(""" |SELECT - | try_divide(10, 0) - | try_divide(NULL, 5) - | try_divide(5, NULL) - | try_divide(-2147483648, -1) - | try_divide(-9223372036854775808, -1) + | try_divide(10, 0), + | try_divide(NULL, 5), + | try_divide(5, NULL), + | try_divide(-2147483648, -1), + | try_divide(-9223372036854775808, -1), | try_divide(DECIMAL('9999999999999999999999999999'), 0.1) | from tbl |""".stripMargin) From 8d6501d4be807ee4c3a53d656c73592607fb0e3e Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Fri, 8 Aug 2025 08:56:40 -0700 Subject: [PATCH 09/14] try_arithmetic_impl --- native/core/src/execution/planner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 053b67a25b..9302097739 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1004,7 +1004,7 @@ impl PhysicalPlanner { return_type: Option<&spark_expression::DataType>, op: DataFusionOperator, input_schema: SchemaRef, - eval_mode : EvalMode + eval_mode: EvalMode ) -> Result, ExecutionError> { self.create_binary_expr_with_options( left, @@ -1026,7 +1026,7 @@ impl PhysicalPlanner { op: DataFusionOperator, input_schema: SchemaRef, options: BinaryExprOptions, - eval_mode : EvalMode + eval_mode: EvalMode ) -> Result, ExecutionError> { let left = self.create_expr(left, Arc::clone(&input_schema))?; let right = self.create_expr(right, Arc::clone(&input_schema))?; From d80a18a4de3709bffc62b9a190a2951b140984ca Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Fri, 8 Aug 2025 09:35:38 -0700 Subject: [PATCH 10/14] try_arithmetic_impl --- native/core/src/execution/planner.rs | 6 +++--- .../org/apache/comet/CometExpressionSuite.scala | 15 +++++++++++++++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 9302097739..6c46d1b0ae 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1004,7 +1004,7 @@ impl PhysicalPlanner { return_type: Option<&spark_expression::DataType>, op: DataFusionOperator, input_schema: SchemaRef, - eval_mode: EvalMode + eval_mode: EvalMode, ) -> Result, ExecutionError> { self.create_binary_expr_with_options( left, @@ -1013,7 +1013,7 @@ impl PhysicalPlanner { op, input_schema, BinaryExprOptions::default(), - eval_mode + eval_mode, ) } @@ -1026,7 +1026,7 @@ impl PhysicalPlanner { op: DataFusionOperator, input_schema: SchemaRef, options: BinaryExprOptions, - eval_mode: EvalMode + eval_mode: EvalMode, ) -> Result, ExecutionError> { let left = self.create_expr(left, Arc::clone(&input_schema))?; let right = self.create_expr(right, Arc::clone(&input_schema))?; diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 93651945f8..f2dcedb53a 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -379,6 +379,21 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("try_integral_divide overflow cases") { + val data = Seq((15121991, 0)) + withParquetTable(data, "tbl") { + checkSparkAnswerAndOperator("SELECT try_divide(_1, _2) FROM tbl") + checkSparkAnswerAndOperator(""" + |SELECT try_divide(-128, -1), + |try_divide(-32768, -1), + |try_divide(-2147483648, -1), + |try_divide(-9223372036854775808, -1), + |try_divide(CAST(99999 AS DECIMAL(5,0)), CAST(0.0001 AS DECIMAL(5,4))) + |from tbl + |""".stripMargin) + } + } + test("dictionary arithmetic") { // TODO: test ANSI mode withSQLConf(SQLConf.ANSI_ENABLED.key -> "false", "parquet.enable.dictionary" -> "true") { From c8a212f9149fdb3e32450e25c247572dc8acd420 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Mon, 11 Aug 2025 08:40:00 -0700 Subject: [PATCH 11/14] try_arithmetic_impl --- native/spark-expr/src/math_funcs/checked_arithmetic.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs index 30e6df209d..a4211ed955 100644 --- a/native/spark-expr/src/math_funcs/checked_arithmetic.rs +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -70,7 +70,7 @@ where } } } - _ => todo!("Unsupported operation: {}", op), + _ => Err!("Unsupported operation: {}", op), } Ok(Arc::new(builder.finish()) as ArrayRef) From e0373d46d1636aee3e6137c2b05154d2532ac609 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Mon, 11 Aug 2025 09:23:18 -0700 Subject: [PATCH 12/14] try_arithmetic_impl --- native/spark-expr/src/math_funcs/checked_arithmetic.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs index a4211ed955..8974b3827f 100644 --- a/native/spark-expr/src/math_funcs/checked_arithmetic.rs +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::fmt::format; use arrow::array::{Array, ArrowNativeTypeOp, PrimitiveArray, PrimitiveBuilder}; use arrow::array::{ArrayRef, AsArray}; @@ -70,7 +71,7 @@ where } } } - _ => Err!("Unsupported operation: {}", op), + _ => Err(format!("Unsupported operation: {}",op)).unwrap(), } Ok(Arc::new(builder.finish()) as ArrayRef) From dc01c9ff5ad60bab705c283e0058727e3e21d457 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Mon, 11 Aug 2025 09:24:45 -0700 Subject: [PATCH 13/14] try_arithmetic_impl --- native/spark-expr/src/math_funcs/checked_arithmetic.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs index 8974b3827f..df44c74c68 100644 --- a/native/spark-expr/src/math_funcs/checked_arithmetic.rs +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::format; use arrow::array::{Array, ArrowNativeTypeOp, PrimitiveArray, PrimitiveBuilder}; use arrow::array::{ArrayRef, AsArray}; +use std::fmt::format; use arrow::datatypes::{ArrowPrimitiveType, DataType, Int32Type, Int64Type}; use datafusion::common::DataFusionError; @@ -71,7 +71,7 @@ where } } } - _ => Err(format!("Unsupported operation: {}",op)).unwrap(), + _ => Err(format!("Unsupported operation: {}", op)).unwrap(), } Ok(Arc::new(builder.finish()) as ArrayRef) From bd002fac6a8e141887ad5d91fe0070fc6d6418c3 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Mon, 11 Aug 2025 09:59:41 -0700 Subject: [PATCH 14/14] try_arithmetic_impl --- native/spark-expr/src/math_funcs/checked_arithmetic.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs index df44c74c68..0312cdb0b0 100644 --- a/native/spark-expr/src/math_funcs/checked_arithmetic.rs +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -17,7 +17,6 @@ use arrow::array::{Array, ArrowNativeTypeOp, PrimitiveArray, PrimitiveBuilder}; use arrow::array::{ArrayRef, AsArray}; -use std::fmt::format; use arrow::datatypes::{ArrowPrimitiveType, DataType, Int32Type, Int64Type}; use datafusion::common::DataFusionError; @@ -71,7 +70,12 @@ where } } } - _ => Err(format!("Unsupported operation: {}", op)).unwrap(), + _ => { + return Err(DataFusionError::Internal(format!( + "Unsupported operation: {:?}", + op + ))) + } } Ok(Arc::new(builder.finish()) as ArrayRef)