From 7f789c58d8833e5f11d76b79ba1ff8bde1b0e4c7 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Mon, 19 Jan 2026 13:17:01 +0530 Subject: [PATCH 1/3] perf: Optimize scalar performance for cot --- datafusion/functions/benches/cot.rs | 46 +++++++ datafusion/functions/src/math/cot.rs | 184 ++++++++++++++++++--------- 2 files changed, 169 insertions(+), 61 deletions(-) diff --git a/datafusion/functions/benches/cot.rs b/datafusion/functions/benches/cot.rs index c47198d4a6208..0511b5c595be4 100644 --- a/datafusion/functions/benches/cot.rs +++ b/datafusion/functions/benches/cot.rs @@ -27,6 +27,7 @@ use datafusion_functions::math::cot; use std::hint::black_box; use arrow::datatypes::{DataType, Field}; +use datafusion_common::ScalarValue; use datafusion_common::config::ConfigOptions; use std::sync::Arc; @@ -85,6 +86,51 @@ fn criterion_benchmark(c: &mut Criterion) { ) }) }); + + // Scalar benchmarks + let scalar_f32_args = + vec![ColumnarValue::Scalar(ScalarValue::Float32(Some(1.0)))]; + let scalar_f32_arg_fields = + vec![Field::new("a", DataType::Float32, false).into()]; + let return_field_f32 = Field::new("f", DataType::Float32, false).into(); + + c.bench_function(&format!("cot f32 scalar: {size}"), |b| { + b.iter(|| { + black_box( + cot_fn + .invoke_with_args(ScalarFunctionArgs { + args: scalar_f32_args.clone(), + arg_fields: scalar_f32_arg_fields.clone(), + number_rows: 1, + return_field: Arc::clone(&return_field_f32), + config_options: Arc::clone(&config_options), + }) + .unwrap(), + ) + }) + }); + + let scalar_f64_args = + vec![ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0)))]; + let scalar_f64_arg_fields = + vec![Field::new("a", DataType::Float64, false).into()]; + let return_field_f64 = Field::new("f", DataType::Float64, false).into(); + + c.bench_function(&format!("cot f64 scalar: {size}"), |b| { + b.iter(|| { + black_box( + cot_fn + .invoke_with_args(ScalarFunctionArgs { + args: scalar_f64_args.clone(), + arg_fields: scalar_f64_arg_fields.clone(), + number_rows: 1, + return_field: Arc::clone(&return_field_f64), + config_options: Arc::clone(&config_options), + }) + .unwrap(), + ) + }) + }); } } diff --git a/datafusion/functions/src/math/cot.rs b/datafusion/functions/src/math/cot.rs index a0d7b02b68e5a..f6b2a8155326d 100644 --- a/datafusion/functions/src/math/cot.rs +++ b/datafusion/functions/src/math/cot.rs @@ -18,12 +18,12 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::{ArrayRef, AsArray}; +use arrow::array::AsArray; use arrow::datatypes::DataType::{Float32, Float64}; use arrow::datatypes::{DataType, Float32Type, Float64Type}; -use crate::utils::make_scalar_function; -use datafusion_common::{Result, exec_err}; +use datafusion_common::utils::take_function_args; +use datafusion_common::{Result, ScalarValue, internal_err}; use datafusion_expr::{ColumnarValue, Documentation, ScalarFunctionArgs}; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; use datafusion_macros::user_doc; @@ -96,24 +96,47 @@ impl ScalarUDFImpl for CotFunc { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - make_scalar_function(cot, vec![])(&args.args) - } -} - -///cot SQL function -fn cot(args: &[ArrayRef]) -> Result { - match args[0].data_type() { - Float64 => Ok(Arc::new( - args[0] - .as_primitive::() - .unary::<_, Float64Type>(|x: f64| compute_cot64(x)), - ) as ArrayRef), - Float32 => Ok(Arc::new( - args[0] - .as_primitive::() - .unary::<_, Float32Type>(|x: f32| compute_cot32(x)), - ) as ArrayRef), - other => exec_err!("Unsupported data type {other:?} for function cot"), + let return_type = args.return_type().clone(); + let [arg] = take_function_args(self.name(), args.args)?; + + match arg { + ColumnarValue::Scalar(scalar) => { + if scalar.is_null() { + return ColumnarValue::Scalar(ScalarValue::Null) + .cast_to(&return_type, None); + } + + match scalar { + ScalarValue::Float64(Some(v)) => Ok(ColumnarValue::Scalar( + ScalarValue::Float64(Some(compute_cot64(v))), + )), + ScalarValue::Float32(Some(v)) => Ok(ColumnarValue::Scalar( + ScalarValue::Float32(Some(compute_cot32(v))), + )), + _ => { + internal_err!( + "Unexpected scalar type for cot: {:?}", + scalar.data_type() + ) + } + } + } + ColumnarValue::Array(array) => match array.data_type() { + Float64 => Ok(ColumnarValue::Array(Arc::new( + array + .as_primitive::() + .unary::<_, Float64Type>(compute_cot64), + ))), + Float32 => Ok(ColumnarValue::Array(Arc::new( + array + .as_primitive::() + .unary::<_, Float32Type>(compute_cot32), + ))), + other => { + internal_err!("Unexpected data type {other:?} for function cot") + } + }, + } } } @@ -129,54 +152,93 @@ fn compute_cot64(x: f64) -> f64 { #[cfg(test)] mod test { - use crate::math::cot::cot; + use std::sync::Arc; + use arrow::array::{ArrayRef, Float32Array, Float64Array}; + use arrow::datatypes::{DataType, Field}; use datafusion_common::cast::{as_float32_array, as_float64_array}; - use std::sync::Arc; + use datafusion_common::config::ConfigOptions; + use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; + + use crate::math::cot::CotFunc; #[test] fn test_cot_f32() { - let args: Vec = - vec![Arc::new(Float32Array::from(vec![12.1, 30.0, 90.0, -30.0]))]; - let result = cot(&args).expect("failed to initialize function cot"); - let floats = - as_float32_array(&result).expect("failed to initialize function cot"); - - let expected = Float32Array::from(vec![ - -1.986_460_4, - -0.156_119_96, - -0.501_202_8, - 0.156_119_96, - ]); - - let eps = 1e-6; - assert_eq!(floats.len(), 4); - assert!((floats.value(0) - expected.value(0)).abs() < eps); - assert!((floats.value(1) - expected.value(1)).abs() < eps); - assert!((floats.value(2) - expected.value(2)).abs() < eps); - assert!((floats.value(3) - expected.value(3)).abs() < eps); + let array = Arc::new(Float32Array::from(vec![12.1, 30.0, 90.0, -30.0])); + let arg_fields = vec![Field::new("a", DataType::Float32, false).into()]; + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::clone(&array) as ArrayRef)], + arg_fields, + number_rows: array.len(), + return_field: Field::new("f", DataType::Float32, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }; + let result = CotFunc::new() + .invoke_with_args(args) + .expect("failed to initialize function cot"); + + match result { + ColumnarValue::Array(arr) => { + let floats = as_float32_array(&arr) + .expect("failed to convert result to a Float32Array"); + + let expected = Float32Array::from(vec![ + -1.986_460_4, + -0.156_119_96, + -0.501_202_8, + 0.156_119_96, + ]); + + let eps = 1e-6; + assert_eq!(floats.len(), 4); + assert!((floats.value(0) - expected.value(0)).abs() < eps); + assert!((floats.value(1) - expected.value(1)).abs() < eps); + assert!((floats.value(2) - expected.value(2)).abs() < eps); + assert!((floats.value(3) - expected.value(3)).abs() < eps); + } + ColumnarValue::Scalar(_) => { + panic!("Expected an array value") + } + } } #[test] fn test_cot_f64() { - let args: Vec = - vec![Arc::new(Float64Array::from(vec![12.1, 30.0, 90.0, -30.0]))]; - let result = cot(&args).expect("failed to initialize function cot"); - let floats = - as_float64_array(&result).expect("failed to initialize function cot"); - - let expected = Float64Array::from(vec![ - -1.986_458_685_881_4, - -0.156_119_952_161_6, - -0.501_202_783_380_1, - 0.156_119_952_161_6, - ]); - - let eps = 1e-12; - assert_eq!(floats.len(), 4); - assert!((floats.value(0) - expected.value(0)).abs() < eps); - assert!((floats.value(1) - expected.value(1)).abs() < eps); - assert!((floats.value(2) - expected.value(2)).abs() < eps); - assert!((floats.value(3) - expected.value(3)).abs() < eps); + let array = Arc::new(Float64Array::from(vec![12.1, 30.0, 90.0, -30.0])); + let arg_fields = vec![Field::new("a", DataType::Float64, false).into()]; + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::clone(&array) as ArrayRef)], + arg_fields, + number_rows: array.len(), + return_field: Field::new("f", DataType::Float64, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }; + let result = CotFunc::new() + .invoke_with_args(args) + .expect("failed to initialize function cot"); + + match result { + ColumnarValue::Array(arr) => { + let floats = as_float64_array(&arr) + .expect("failed to convert result to a Float64Array"); + + let expected = Float64Array::from(vec![ + -1.986_458_685_881_4, + -0.156_119_952_161_6, + -0.501_202_783_380_1, + 0.156_119_952_161_6, + ]); + + let eps = 1e-12; + assert_eq!(floats.len(), 4); + assert!((floats.value(0) - expected.value(0)).abs() < eps); + assert!((floats.value(1) - expected.value(1)).abs() < eps); + assert!((floats.value(2) - expected.value(2)).abs() < eps); + assert!((floats.value(3) - expected.value(3)).abs() < eps); + } + ColumnarValue::Scalar(_) => { + panic!("Expected an array value") + } + } } } From 6d3bfd30a460eec91b1ea585cda63b3cb44c3994 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Tue, 20 Jan 2026 20:29:07 +0530 Subject: [PATCH 2/3] moved return type to avoid clone & scalar benchmark ourside size loop --- datafusion/functions/benches/cot.rs | 85 ++++++++++++++-------------- datafusion/functions/src/math/cot.rs | 4 +- 2 files changed, 44 insertions(+), 45 deletions(-) diff --git a/datafusion/functions/benches/cot.rs b/datafusion/functions/benches/cot.rs index 0511b5c595be4..061d14cbf0655 100644 --- a/datafusion/functions/benches/cot.rs +++ b/datafusion/functions/benches/cot.rs @@ -33,6 +33,9 @@ use std::sync::Arc; fn criterion_benchmark(c: &mut Criterion) { let cot_fn = cot(); + let config_options = Arc::new(ConfigOptions::default()); + + // Array benchmarks - run for different sizes for size in [1024, 4096, 8192] { let f32_array = Arc::new(create_primitive_array::(size, 0.2)); let f32_args = vec![ColumnarValue::Array(f32_array)]; @@ -43,7 +46,6 @@ fn criterion_benchmark(c: &mut Criterion) { Field::new(format!("arg_{idx}"), arg.data_type(), true).into() }) .collect::>(); - let config_options = Arc::new(ConfigOptions::default()); c.bench_function(&format!("cot f32 array: {size}"), |b| { b.iter(|| { @@ -60,6 +62,7 @@ fn criterion_benchmark(c: &mut Criterion) { ) }) }); + let f64_array = Arc::new(create_primitive_array::(size, 0.2)); let f64_args = vec![ColumnarValue::Array(f64_array)]; let arg_fields = f64_args @@ -86,52 +89,48 @@ fn criterion_benchmark(c: &mut Criterion) { ) }) }); + } - // Scalar benchmarks - let scalar_f32_args = - vec![ColumnarValue::Scalar(ScalarValue::Float32(Some(1.0)))]; - let scalar_f32_arg_fields = - vec![Field::new("a", DataType::Float32, false).into()]; - let return_field_f32 = Field::new("f", DataType::Float32, false).into(); + // Scalar benchmarks - run only once since size doesn't affect scalar performance + let scalar_f32_args = vec![ColumnarValue::Scalar(ScalarValue::Float32(Some(1.0)))]; + let scalar_f32_arg_fields = vec![Field::new("a", DataType::Float32, false).into()]; + let return_field_f32 = Field::new("f", DataType::Float32, false).into(); - c.bench_function(&format!("cot f32 scalar: {size}"), |b| { - b.iter(|| { - black_box( - cot_fn - .invoke_with_args(ScalarFunctionArgs { - args: scalar_f32_args.clone(), - arg_fields: scalar_f32_arg_fields.clone(), - number_rows: 1, - return_field: Arc::clone(&return_field_f32), - config_options: Arc::clone(&config_options), - }) - .unwrap(), - ) - }) - }); + c.bench_function("cot f32 scalar", |b| { + b.iter(|| { + black_box( + cot_fn + .invoke_with_args(ScalarFunctionArgs { + args: scalar_f32_args.clone(), + arg_fields: scalar_f32_arg_fields.clone(), + number_rows: 1, + return_field: Arc::clone(&return_field_f32), + config_options: Arc::clone(&config_options), + }) + .unwrap(), + ) + }) + }); - let scalar_f64_args = - vec![ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0)))]; - let scalar_f64_arg_fields = - vec![Field::new("a", DataType::Float64, false).into()]; - let return_field_f64 = Field::new("f", DataType::Float64, false).into(); + let scalar_f64_args = vec![ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0)))]; + let scalar_f64_arg_fields = vec![Field::new("a", DataType::Float64, false).into()]; + let return_field_f64 = Field::new("f", DataType::Float64, false).into(); - c.bench_function(&format!("cot f64 scalar: {size}"), |b| { - b.iter(|| { - black_box( - cot_fn - .invoke_with_args(ScalarFunctionArgs { - args: scalar_f64_args.clone(), - arg_fields: scalar_f64_arg_fields.clone(), - number_rows: 1, - return_field: Arc::clone(&return_field_f64), - config_options: Arc::clone(&config_options), - }) - .unwrap(), - ) - }) - }); - } + c.bench_function("cot f64 scalar", |b| { + b.iter(|| { + black_box( + cot_fn + .invoke_with_args(ScalarFunctionArgs { + args: scalar_f64_args.clone(), + arg_fields: scalar_f64_arg_fields.clone(), + number_rows: 1, + return_field: Arc::clone(&return_field_f64), + config_options: Arc::clone(&config_options), + }) + .unwrap(), + ) + }) + }); } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/functions/src/math/cot.rs b/datafusion/functions/src/math/cot.rs index f6b2a8155326d..8482ac566b07f 100644 --- a/datafusion/functions/src/math/cot.rs +++ b/datafusion/functions/src/math/cot.rs @@ -96,14 +96,14 @@ impl ScalarUDFImpl for CotFunc { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let return_type = args.return_type().clone(); + let return_field = args.return_field; let [arg] = take_function_args(self.name(), args.args)?; match arg { ColumnarValue::Scalar(scalar) => { if scalar.is_null() { return ColumnarValue::Scalar(ScalarValue::Null) - .cast_to(&return_type, None); + .cast_to(return_field.data_type(), None); } match scalar { From 131448473f28efbdd3b908142ef24eb8216e8f98 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Tue, 20 Jan 2026 20:42:44 +0530 Subject: [PATCH 3/3] added unit tests for scalar path --- datafusion/functions/src/math/cot.rs | 119 +++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/datafusion/functions/src/math/cot.rs b/datafusion/functions/src/math/cot.rs index 8482ac566b07f..1f67ef713833f 100644 --- a/datafusion/functions/src/math/cot.rs +++ b/datafusion/functions/src/math/cot.rs @@ -156,6 +156,7 @@ mod test { use arrow::array::{ArrayRef, Float32Array, Float64Array}; use arrow::datatypes::{DataType, Field}; + use datafusion_common::ScalarValue; use datafusion_common::cast::{as_float32_array, as_float64_array}; use datafusion_common::config::ConfigOptions; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; @@ -241,4 +242,122 @@ mod test { } } } + + #[test] + fn test_cot_scalar_f64() { + let arg_fields = vec![Field::new("a", DataType::Float64, false).into()]; + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0)))], + arg_fields, + number_rows: 1, + return_field: Field::new("f", DataType::Float64, false).into(), + config_options: Arc::new(ConfigOptions::default()), + }; + let result = CotFunc::new() + .invoke_with_args(args) + .expect("cot scalar should succeed"); + + match result { + ColumnarValue::Scalar(ScalarValue::Float64(Some(v))) => { + // cot(1.0) = 1/tan(1.0) ≈ 0.6420926159343306 + let expected = 1.0_f64 / 1.0_f64.tan(); + assert!((v - expected).abs() < 1e-12); + } + _ => panic!("Expected Float64 scalar"), + } + } + + #[test] + fn test_cot_scalar_f32() { + let arg_fields = vec![Field::new("a", DataType::Float32, false).into()]; + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Float32(Some(1.0)))], + arg_fields, + number_rows: 1, + return_field: Field::new("f", DataType::Float32, false).into(), + config_options: Arc::new(ConfigOptions::default()), + }; + let result = CotFunc::new() + .invoke_with_args(args) + .expect("cot scalar should succeed"); + + match result { + ColumnarValue::Scalar(ScalarValue::Float32(Some(v))) => { + let expected = 1.0_f32 / 1.0_f32.tan(); + assert!((v - expected).abs() < 1e-6); + } + _ => panic!("Expected Float32 scalar"), + } + } + + #[test] + fn test_cot_scalar_null() { + let arg_fields = vec![Field::new("a", DataType::Float64, true).into()]; + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Float64(None))], + arg_fields, + number_rows: 1, + return_field: Field::new("f", DataType::Float64, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }; + let result = CotFunc::new() + .invoke_with_args(args) + .expect("cot null should succeed"); + + match result { + ColumnarValue::Scalar(scalar) => { + assert!(scalar.is_null()); + } + _ => panic!("Expected scalar result"), + } + } + + #[test] + fn test_cot_scalar_zero() { + let arg_fields = vec![Field::new("a", DataType::Float64, false).into()]; + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Float64(Some(0.0)))], + arg_fields, + number_rows: 1, + return_field: Field::new("f", DataType::Float64, false).into(), + config_options: Arc::new(ConfigOptions::default()), + }; + let result = CotFunc::new() + .invoke_with_args(args) + .expect("cot zero should succeed"); + + match result { + ColumnarValue::Scalar(ScalarValue::Float64(Some(v))) => { + // cot(0) = 1/tan(0) = infinity + assert!(v.is_infinite()); + } + _ => panic!("Expected Float64 scalar"), + } + } + + #[test] + fn test_cot_scalar_pi() { + let arg_fields = vec![Field::new("a", DataType::Float64, false).into()]; + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Float64(Some( + std::f64::consts::PI, + )))], + arg_fields, + number_rows: 1, + return_field: Field::new("f", DataType::Float64, false).into(), + config_options: Arc::new(ConfigOptions::default()), + }; + let result = CotFunc::new() + .invoke_with_args(args) + .expect("cot pi should succeed"); + + match result { + ColumnarValue::Scalar(ScalarValue::Float64(Some(v))) => { + // cot(PI) = 1/tan(PI) - very large negative number due to floating point + let expected = 1.0_f64 / std::f64::consts::PI.tan(); + assert!((v - expected).abs() < 1e-6); + } + _ => panic!("Expected Float64 scalar"), + } + } }