diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index 4ecd7a597814..939fcfd11fba 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -315,3 +315,8 @@ required-features = ["unicode_expressions"] harness = false name = "factorial" required-features = ["math_expressions"] + +[[bench]] +harness = false +name = "floor_ceil" +required-features = ["math_expressions"] diff --git a/datafusion/functions/benches/floor_ceil.rs b/datafusion/functions/benches/floor_ceil.rs new file mode 100644 index 000000000000..7995779ab536 --- /dev/null +++ b/datafusion/functions/benches/floor_ceil.rs @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate criterion; + +use arrow::datatypes::{DataType, Field, Float64Type}; +use arrow::util::bench_util::create_primitive_array; +use criterion::{Criterion, SamplingMode, criterion_group, criterion_main}; +use datafusion_common::ScalarValue; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; +use datafusion_functions::math::{ceil, floor}; +use std::hint::black_box; +use std::sync::Arc; +use std::time::Duration; + +fn criterion_benchmark(c: &mut Criterion) { + let floor_fn = floor(); + let ceil_fn = ceil(); + let config_options = Arc::new(ConfigOptions::default()); + + for size in [1024, 4096, 8192] { + let mut group = c.benchmark_group(format!("floor_ceil size={size}")); + group.sampling_mode(SamplingMode::Flat); + group.sample_size(10); + group.measurement_time(Duration::from_secs(10)); + + // Float64 array benchmark + let f64_array = Arc::new(create_primitive_array::(size, 0.1)); + let batch_len = f64_array.len(); + let f64_args = vec![ColumnarValue::Array(f64_array)]; + + group.bench_function("floor_f64_array", |b| { + b.iter(|| { + let args_cloned = f64_args.clone(); + black_box( + floor_fn + .invoke_with_args(ScalarFunctionArgs { + args: args_cloned, + arg_fields: vec![ + Field::new("a", DataType::Float64, true).into(), + ], + number_rows: batch_len, + return_field: Field::new("f", DataType::Float64, true).into(), + config_options: Arc::clone(&config_options), + }) + .unwrap(), + ) + }) + }); + + group.bench_function("ceil_f64_array", |b| { + b.iter(|| { + let args_cloned = f64_args.clone(); + black_box( + ceil_fn + .invoke_with_args(ScalarFunctionArgs { + args: args_cloned, + arg_fields: vec![ + Field::new("a", DataType::Float64, true).into(), + ], + number_rows: batch_len, + return_field: Field::new("f", DataType::Float64, true).into(), + config_options: Arc::clone(&config_options), + }) + .unwrap(), + ) + }) + }); + + // Scalar benchmark (the optimization we added) + let scalar_args = vec![ColumnarValue::Scalar(ScalarValue::Float64(Some( + std::f64::consts::PI, + )))]; + + group.bench_function("floor_f64_scalar", |b| { + b.iter(|| { + let args_cloned = scalar_args.clone(); + black_box( + floor_fn + .invoke_with_args(ScalarFunctionArgs { + args: args_cloned, + arg_fields: vec![ + Field::new("a", DataType::Float64, false).into(), + ], + number_rows: 1, + return_field: Field::new("f", DataType::Float64, false) + .into(), + config_options: Arc::clone(&config_options), + }) + .unwrap(), + ) + }) + }); + + group.bench_function("ceil_f64_scalar", |b| { + b.iter(|| { + let args_cloned = scalar_args.clone(); + black_box( + ceil_fn + .invoke_with_args(ScalarFunctionArgs { + args: args_cloned, + arg_fields: vec![ + Field::new("a", DataType::Float64, false).into(), + ], + number_rows: 1, + return_field: Field::new("f", DataType::Float64, false) + .into(), + config_options: Arc::clone(&config_options), + }) + .unwrap(), + ) + }) + }); + + group.finish(); + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions/src/math/ceil.rs b/datafusion/functions/src/math/ceil.rs index 501741002f96..5961b3cb27fe 100644 --- a/datafusion/functions/src/math/ceil.rs +++ b/datafusion/functions/src/math/ceil.rs @@ -95,8 +95,35 @@ impl ScalarUDFImpl for CeilFunc { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let args = ColumnarValue::values_to_arrays(&args.args)?; - let value = &args[0]; + let arg = &args.args[0]; + + // Scalar fast path for float types - avoid array conversion overhead entirely + if let ColumnarValue::Scalar(scalar) = arg { + match scalar { + ScalarValue::Float64(v) => { + return Ok(ColumnarValue::Scalar(ScalarValue::Float64( + v.map(f64::ceil), + ))); + } + ScalarValue::Float32(v) => { + return Ok(ColumnarValue::Scalar(ScalarValue::Float32( + v.map(f32::ceil), + ))); + } + ScalarValue::Null => { + return Ok(ColumnarValue::Scalar(ScalarValue::Float64(None))); + } + // For decimals: convert to array of size 1, process, then extract scalar + // This ensures we don't expand the array while reusing overflow validation + _ => {} + } + } + + // Track if input was a scalar to convert back at the end + let is_scalar = matches!(arg, ColumnarValue::Scalar(_)); + + // Array path (also handles decimal scalars converted to size-1 arrays) + let value = arg.to_array(args.number_rows)?; let result: ArrayRef = match value.data_type() { DataType::Float64 => Arc::new( @@ -114,7 +141,7 @@ impl ScalarUDFImpl for CeilFunc { } DataType::Decimal32(precision, scale) => { apply_decimal_op::( - value, + &value, *precision, *scale, self.name(), @@ -123,7 +150,7 @@ impl ScalarUDFImpl for CeilFunc { } DataType::Decimal64(precision, scale) => { apply_decimal_op::( - value, + &value, *precision, *scale, self.name(), @@ -132,7 +159,7 @@ impl ScalarUDFImpl for CeilFunc { } DataType::Decimal128(precision, scale) => { apply_decimal_op::( - value, + &value, *precision, *scale, self.name(), @@ -141,7 +168,7 @@ impl ScalarUDFImpl for CeilFunc { } DataType::Decimal256(precision, scale) => { apply_decimal_op::( - value, + &value, *precision, *scale, self.name(), @@ -156,7 +183,12 @@ impl ScalarUDFImpl for CeilFunc { } }; - Ok(ColumnarValue::Array(result)) + // If input was a scalar, convert result back to scalar + if is_scalar { + ScalarValue::try_from_array(&result, 0).map(ColumnarValue::Scalar) + } else { + Ok(ColumnarValue::Array(result)) + } } fn output_ordering(&self, input: &[ExprProperties]) -> Result { diff --git a/datafusion/functions/src/math/floor.rs b/datafusion/functions/src/math/floor.rs index 221e58e1e7a7..d69f9b9d86fe 100644 --- a/datafusion/functions/src/math/floor.rs +++ b/datafusion/functions/src/math/floor.rs @@ -95,8 +95,35 @@ impl ScalarUDFImpl for FloorFunc { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let args = ColumnarValue::values_to_arrays(&args.args)?; - let value = &args[0]; + let arg = &args.args[0]; + + // Scalar fast path for float types - avoid array conversion overhead entirely + if let ColumnarValue::Scalar(scalar) = arg { + match scalar { + ScalarValue::Float64(v) => { + return Ok(ColumnarValue::Scalar(ScalarValue::Float64( + v.map(f64::floor), + ))); + } + ScalarValue::Float32(v) => { + return Ok(ColumnarValue::Scalar(ScalarValue::Float32( + v.map(f32::floor), + ))); + } + ScalarValue::Null => { + return Ok(ColumnarValue::Scalar(ScalarValue::Float64(None))); + } + // For decimals: convert to array of size 1, process, then extract scalar + // This ensures we don't expand the array while reusing overflow validation + _ => {} + } + } + + // Track if input was a scalar to convert back at the end + let is_scalar = matches!(arg, ColumnarValue::Scalar(_)); + + // Array path (also handles decimal scalars converted to size-1 arrays) + let value = arg.to_array(args.number_rows)?; let result: ArrayRef = match value.data_type() { DataType::Float64 => Arc::new( @@ -114,7 +141,7 @@ impl ScalarUDFImpl for FloorFunc { } DataType::Decimal32(precision, scale) => { apply_decimal_op::( - value, + &value, *precision, *scale, self.name(), @@ -123,7 +150,7 @@ impl ScalarUDFImpl for FloorFunc { } DataType::Decimal64(precision, scale) => { apply_decimal_op::( - value, + &value, *precision, *scale, self.name(), @@ -132,7 +159,7 @@ impl ScalarUDFImpl for FloorFunc { } DataType::Decimal128(precision, scale) => { apply_decimal_op::( - value, + &value, *precision, *scale, self.name(), @@ -141,7 +168,7 @@ impl ScalarUDFImpl for FloorFunc { } DataType::Decimal256(precision, scale) => { apply_decimal_op::( - value, + &value, *precision, *scale, self.name(), @@ -156,7 +183,12 @@ impl ScalarUDFImpl for FloorFunc { } }; - Ok(ColumnarValue::Array(result)) + // If input was a scalar, convert result back to scalar + if is_scalar { + ScalarValue::try_from_array(&result, 0).map(ColumnarValue::Scalar) + } else { + Ok(ColumnarValue::Array(result)) + } } fn output_ordering(&self, input: &[ExprProperties]) -> Result {