From dc48533528d8559d6f34c24e8913511a3799238b Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 29 Mar 2025 09:15:43 -0400 Subject: [PATCH 1/6] ScalarUDF via FFI would break with nullary type inputs. We could not coerce empty vector of arguments to nullary due to check. This also exposes return_type_from_args instead of just return_type that was causing some UDFs to fail. Added unit tests and moved around FFI test modules a little. --- .../expr/src/type_coercion/functions.rs | 2 +- datafusion/ffi/src/tests/mod.rs | 6 +- datafusion/ffi/src/tests/udf_udaf_udwf.rs | 11 +- datafusion/ffi/src/tests/utils.rs | 70 +++++++++ datafusion/ffi/src/{udf.rs => udf/mod.rs} | 47 +++++- datafusion/ffi/src/udf/return_info.rs | 53 +++++++ datafusion/ffi/src/udf/return_type_args.rs | 142 ++++++++++++++++++ datafusion/ffi/tests/ffi_integration.rs | 116 +------------- datafusion/ffi/tests/ffi_udf.rs | 104 +++++++++++++ 9 files changed, 434 insertions(+), 117 deletions(-) create mode 100644 datafusion/ffi/src/tests/utils.rs rename datafusion/ffi/src/{udf.rs => udf/mod.rs} (87%) create mode 100644 datafusion/ffi/src/udf/return_info.rs create mode 100644 datafusion/ffi/src/udf/return_type_args.rs create mode 100644 datafusion/ffi/tests/ffi_udf.rs diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index 0ec017bdc27f6..a2051c55999dc 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -49,7 +49,7 @@ pub fn data_types_with_scalar_udf( let signature = func.signature(); let type_signature = &signature.type_signature; - if current_types.is_empty() { + if current_types.is_empty() && type_signature != &TypeSignature::UserDefined { if type_signature.supports_zero_argument() { return Ok(vec![]); } else if type_signature.used_to_support_zero_arguments() { diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs index 4b4a29276d9a8..c7a9816431e10 100644 --- a/datafusion/ffi/src/tests/mod.rs +++ b/datafusion/ffi/src/tests/mod.rs @@ -37,12 +37,13 @@ use datafusion::{ common::record_batch, }; use sync_provider::create_sync_table_provider; -use udf_udaf_udwf::create_ffi_abs_func; +use udf_udaf_udwf::{create_ffi_abs_func, create_ffi_random_func}; mod async_provider; pub mod catalog; mod sync_provider; mod udf_udaf_udwf; +pub mod utils; #[repr(C)] #[derive(StableAbi)] @@ -60,6 +61,8 @@ pub struct ForeignLibraryModule { /// Create a scalar UDF pub create_scalar_udf: extern "C" fn() -> FFI_ScalarUDF, + pub create_nullary_udf: extern "C" fn() -> FFI_ScalarUDF, + pub version: extern "C" fn() -> u64, } @@ -105,6 +108,7 @@ pub fn get_foreign_library_module() -> ForeignLibraryModuleRef { create_catalog: create_catalog_provider, create_table: construct_table_provider, create_scalar_udf: create_ffi_abs_func, + create_nullary_udf: create_ffi_random_func, version: super::version, } .leak_into_prefix() diff --git a/datafusion/ffi/src/tests/udf_udaf_udwf.rs b/datafusion/ffi/src/tests/udf_udaf_udwf.rs index e8a13aac13081..b40bec762bd71 100644 --- a/datafusion/ffi/src/tests/udf_udaf_udwf.rs +++ b/datafusion/ffi/src/tests/udf_udaf_udwf.rs @@ -16,7 +16,10 @@ // under the License. use crate::udf::FFI_ScalarUDF; -use datafusion::{functions::math::abs::AbsFunc, logical_expr::ScalarUDF}; +use datafusion::{ + functions::math::{abs::AbsFunc, random::RandomFunc}, + logical_expr::ScalarUDF, +}; use std::sync::Arc; @@ -25,3 +28,9 @@ pub(crate) extern "C" fn create_ffi_abs_func() -> FFI_ScalarUDF { udf.into() } + +pub(crate) extern "C" fn create_ffi_random_func() -> FFI_ScalarUDF { + let udf: Arc = Arc::new(RandomFunc::new().into()); + + udf.into() +} diff --git a/datafusion/ffi/src/tests/utils.rs b/datafusion/ffi/src/tests/utils.rs new file mode 100644 index 0000000000000..f01f5253126a9 --- /dev/null +++ b/datafusion/ffi/src/tests/utils.rs @@ -0,0 +1,70 @@ +use crate::tests::ForeignLibraryModuleRef; +use abi_stable::library::RootModule; +use datafusion::error::{DataFusionError, Result}; +use std::path::Path; + +/// Compute the path to the library. It would be preferable to simply use +/// abi_stable::library::development_utils::compute_library_path however +/// our current CI pipeline has a `ci` profile that we need to use to +/// find the library. +pub fn compute_library_path( + target_path: &Path, +) -> std::io::Result { + let debug_dir = target_path.join("debug"); + let release_dir = target_path.join("release"); + let ci_dir = target_path.join("ci"); + + let debug_path = M::get_library_path(&debug_dir.join("deps")); + let release_path = M::get_library_path(&release_dir.join("deps")); + let ci_path = M::get_library_path(&ci_dir.join("deps")); + + let all_paths = vec![ + (debug_dir.clone(), debug_path), + (release_dir, release_path), + (ci_dir, ci_path), + ]; + + let best_path = all_paths + .into_iter() + .filter(|(_, path)| path.exists()) + .filter_map(|(dir, path)| path.metadata().map(|m| (dir, m)).ok()) + .filter_map(|(dir, meta)| meta.modified().map(|m| (dir, m)).ok()) + .max_by_key(|(_, date)| *date) + .map(|(dir, _)| dir) + .unwrap_or(debug_dir); + + Ok(best_path) +} + +pub fn get_module() -> Result { + let expected_version = crate::version(); + + let crate_root = Path::new(env!("CARGO_MANIFEST_DIR")); + let target_dir = crate_root + .parent() + .expect("Failed to find crate parent") + .parent() + .expect("Failed to find workspace root") + .join("target"); + + // Find the location of the library. This is specific to the build environment, + // so you will need to change the approach here based on your use case. + // let target: &std::path::Path = "../../../../target/".as_ref(); + let library_path = + compute_library_path::(target_dir.as_path()) + .map_err(|e| DataFusionError::External(Box::new(e)))? + .join("deps"); + + // Load the module + let module = ForeignLibraryModuleRef::load_from_directory(&library_path) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + assert_eq!( + module + .version() + .expect("Unable to call version on FFI module")(), + expected_version + ); + + Ok(module) +} diff --git a/datafusion/ffi/src/udf.rs b/datafusion/ffi/src/udf/mod.rs similarity index 87% rename from datafusion/ffi/src/udf.rs rename to datafusion/ffi/src/udf/mod.rs index bbc9cf936ceec..706b9fabedcb4 100644 --- a/datafusion/ffi/src/udf.rs +++ b/datafusion/ffi/src/udf/mod.rs @@ -29,7 +29,9 @@ use arrow::{ }; use datafusion::{ error::DataFusionError, - logical_expr::type_coercion::functions::data_types_with_scalar_udf, + logical_expr::{ + type_coercion::functions::data_types_with_scalar_udf, ReturnInfo, ReturnTypeArgs, + }, }; use datafusion::{ error::Result, @@ -37,6 +39,10 @@ use datafusion::{ ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, }, }; +use return_info::FFI_ReturnInfo; +use return_type_args::{ + FFI_ReturnTypeArgs, ForeignReturnTypeArgs, ForeignReturnTypeArgsOwned, +}; use crate::{ arrow_wrappers::{WrappedArray, WrappedSchema}, @@ -45,6 +51,9 @@ use crate::{ volatility::FFI_Volatility, }; +pub mod return_info; +pub mod return_type_args; + /// A stable struct for sharing a [`ScalarUDF`] across FFI boundaries. #[repr(C)] #[derive(Debug, StableAbi)] @@ -66,6 +75,14 @@ pub struct FFI_ScalarUDF { arg_types: RVec, ) -> RResult, + /// Determines the return info of the underlying [`ScalarUDF`]. Either this + /// or return_type may be implemented on a UDF. + pub return_type_from_args: unsafe extern "C" fn( + udf: &Self, + args: FFI_ReturnTypeArgs, + ) + -> RResult, + /// Execute the underlying [`ScalarUDF`] and return the result as a `FFI_ArrowArray` /// within an AbiStable wrapper. pub invoke_with_args: unsafe extern "C" fn( @@ -123,6 +140,23 @@ unsafe extern "C" fn return_type_fn_wrapper( rresult!(return_type) } +unsafe extern "C" fn return_type_from_args_fn_wrapper( + udf: &FFI_ScalarUDF, + args: FFI_ReturnTypeArgs, +) -> RResult { + let private_data = udf.private_data as *const ScalarUDFPrivateData; + let udf = &(*private_data).udf; + + let args: ForeignReturnTypeArgsOwned = rresult_return!((&args).try_into()); + let args_ref: ForeignReturnTypeArgs = (&args).into(); + + let return_type = udf + .return_type_from_args((&args_ref).into()) + .and_then(FFI_ReturnInfo::try_from); + + rresult!(return_type) +} + unsafe extern "C" fn coerce_types_fn_wrapper( udf: &FFI_ScalarUDF, arg_types: RVec, @@ -209,6 +243,7 @@ impl From> for FFI_ScalarUDF { short_circuits, invoke_with_args: invoke_with_args_fn_wrapper, return_type: return_type_fn_wrapper, + return_type_from_args: return_type_from_args_fn_wrapper, coerce_types: coerce_types_fn_wrapper, clone: clone_fn_wrapper, release: release_fn_wrapper, @@ -281,6 +316,16 @@ impl ScalarUDFImpl for ForeignScalarUDF { result.and_then(|r| (&r.0).try_into().map_err(DataFusionError::from)) } + fn return_type_from_args(&self, args: ReturnTypeArgs) -> Result { + let args: FFI_ReturnTypeArgs = args.try_into()?; + + let result = unsafe { (self.udf.return_type_from_args)(&self.udf, args) }; + + let result = df_result!(result); + + result.and_then(|r| r.try_into()) + } + fn invoke_with_args(&self, invoke_args: ScalarFunctionArgs) -> Result { let ScalarFunctionArgs { args, diff --git a/datafusion/ffi/src/udf/return_info.rs b/datafusion/ffi/src/udf/return_info.rs new file mode 100644 index 0000000000000..27086088c5d6b --- /dev/null +++ b/datafusion/ffi/src/udf/return_info.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use abi_stable::StableAbi; +use arrow::{datatypes::DataType, ffi::FFI_ArrowSchema}; +use datafusion::{error::DataFusionError, logical_expr::ReturnInfo}; + +use crate::arrow_wrappers::WrappedSchema; + +/// A stable struct for sharing a [`ReturnTypeArgs`] across FFI boundaries. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_ReturnInfo { + return_type: WrappedSchema, + nullable: bool, +} + +impl TryFrom for FFI_ReturnInfo { + type Error = DataFusionError; + + fn try_from(value: ReturnInfo) -> Result { + let return_type = WrappedSchema(FFI_ArrowSchema::try_from(value.return_type())?); + Ok(Self { + return_type, + nullable: value.nullable(), + }) + } +} + +impl TryFrom for ReturnInfo { + type Error = DataFusionError; + + fn try_from(value: FFI_ReturnInfo) -> Result { + let return_type = DataType::try_from(&value.return_type.0)?; + + Ok(ReturnInfo::new(return_type, value.nullable)) + } +} diff --git a/datafusion/ffi/src/udf/return_type_args.rs b/datafusion/ffi/src/udf/return_type_args.rs new file mode 100644 index 0000000000000..a0897630e2ea9 --- /dev/null +++ b/datafusion/ffi/src/udf/return_type_args.rs @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use abi_stable::{ + std_types::{ROption, RVec}, + StableAbi, +}; +use arrow::datatypes::DataType; +use datafusion::{ + common::exec_datafusion_err, error::DataFusionError, logical_expr::ReturnTypeArgs, + scalar::ScalarValue, +}; + +use crate::{ + arrow_wrappers::WrappedSchema, + util::{rvec_wrapped_to_vec_datatype, vec_datatype_to_rvec_wrapped}, +}; +use prost::Message; + +/// A stable struct for sharing a [`ReturnTypeArgs`] across FFI boundaries. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_ReturnTypeArgs { + arg_types: RVec, + scalar_arguments: RVec>>, + nullables: RVec, +} + +impl TryFrom> for FFI_ReturnTypeArgs { + type Error = DataFusionError; + + fn try_from(value: ReturnTypeArgs) -> Result { + let arg_types = vec_datatype_to_rvec_wrapped(value.arg_types)?; + let scalar_arguments: Result, Self::Error> = value + .scalar_arguments + .iter() + .map(|maybe_arg| { + maybe_arg + .map(|arg| { + let proto_value: datafusion_proto::protobuf::ScalarValue = + arg.try_into()?; + let proto_bytes: RVec = proto_value.encode_to_vec().into(); + Ok(proto_bytes) + }) + .transpose() + }) + .collect(); + let scalar_arguments = scalar_arguments?.into_iter().map(ROption::from).collect(); + + let nullables = value.nullables.into(); + Ok(Self { + arg_types, + scalar_arguments, + nullables, + }) + } +} + +// TODO(tsaucer) It would be good to find a better way around this, but it +// appears a restriction based on the need to have a borrowed ScalarValue +// in the arguments when converted to ReturnTypeArgs +pub struct ForeignReturnTypeArgsOwned { + arg_types: Vec, + scalar_arguments: Vec>, + nullables: Vec, +} + +pub struct ForeignReturnTypeArgs<'a> { + arg_types: &'a [DataType], + scalar_arguments: Vec>, + nullables: &'a [bool], +} + +impl TryFrom<&FFI_ReturnTypeArgs> for ForeignReturnTypeArgsOwned { + type Error = DataFusionError; + + fn try_from(value: &FFI_ReturnTypeArgs) -> Result { + let arg_types = rvec_wrapped_to_vec_datatype(&value.arg_types)?; + let scalar_arguments: Result, Self::Error> = value + .scalar_arguments + .iter() + .map(|maybe_arg| { + let maybe_arg = maybe_arg.as_ref().map(|arg| { + let proto_value = + datafusion_proto::protobuf::ScalarValue::decode(arg.as_ref()) + .map_err(|err| exec_datafusion_err!("{}", err))?; + let scalar_value: ScalarValue = (&proto_value).try_into()?; + Ok(scalar_value) + }); + Option::from(maybe_arg).transpose() + }) + .collect(); + let scalar_arguments = scalar_arguments?.into_iter().collect(); + + let nullables = value.nullables.iter().cloned().collect(); + + Ok(Self { + arg_types, + scalar_arguments, + nullables, + }) + } +} + +impl<'a> From<&'a ForeignReturnTypeArgsOwned> for ForeignReturnTypeArgs<'a> { + fn from(value: &'a ForeignReturnTypeArgsOwned) -> Self { + Self { + arg_types: &value.arg_types, + scalar_arguments: value + .scalar_arguments + .iter() + .map(|opt| opt.as_ref()) + .collect(), + nullables: &value.nullables, + } + } +} + +impl<'a> From<&'a ForeignReturnTypeArgs<'a>> for ReturnTypeArgs<'a> { + fn from(value: &'a ForeignReturnTypeArgs) -> Self { + ReturnTypeArgs { + arg_types: value.arg_types, + scalar_arguments: &value.scalar_arguments, + nullables: value.nullables, + } + } +} diff --git a/datafusion/ffi/tests/ffi_integration.rs b/datafusion/ffi/tests/ffi_integration.rs index f610f12c8244e..c6df324e9a17c 100644 --- a/datafusion/ffi/tests/ffi_integration.rs +++ b/datafusion/ffi/tests/ffi_integration.rs @@ -20,84 +20,14 @@ #[cfg(feature = "integration-tests")] mod tests { - use abi_stable::library::RootModule; - use datafusion::common::record_batch; use datafusion::error::{DataFusionError, Result}; - use datafusion::logical_expr::ScalarUDF; - use datafusion::prelude::{col, SessionContext}; + use datafusion::prelude::SessionContext; use datafusion_ffi::catalog_provider::ForeignCatalogProvider; use datafusion_ffi::table_provider::ForeignTableProvider; - use datafusion_ffi::tests::{create_record_batch, ForeignLibraryModuleRef}; - use datafusion_ffi::udf::ForeignScalarUDF; - use std::path::Path; + use datafusion_ffi::tests::create_record_batch; + use datafusion_ffi::tests::utils::get_module; use std::sync::Arc; - /// Compute the path to the library. It would be preferable to simply use - /// abi_stable::library::development_utils::compute_library_path however - /// our current CI pipeline has a `ci` profile that we need to use to - /// find the library. - pub fn compute_library_path( - target_path: &Path, - ) -> std::io::Result { - let debug_dir = target_path.join("debug"); - let release_dir = target_path.join("release"); - let ci_dir = target_path.join("ci"); - - let debug_path = M::get_library_path(&debug_dir.join("deps")); - let release_path = M::get_library_path(&release_dir.join("deps")); - let ci_path = M::get_library_path(&ci_dir.join("deps")); - - let all_paths = vec![ - (debug_dir.clone(), debug_path), - (release_dir, release_path), - (ci_dir, ci_path), - ]; - - let best_path = all_paths - .into_iter() - .filter(|(_, path)| path.exists()) - .filter_map(|(dir, path)| path.metadata().map(|m| (dir, m)).ok()) - .filter_map(|(dir, meta)| meta.modified().map(|m| (dir, m)).ok()) - .max_by_key(|(_, date)| *date) - .map(|(dir, _)| dir) - .unwrap_or(debug_dir); - - Ok(best_path) - } - - fn get_module() -> Result { - let expected_version = datafusion_ffi::version(); - - let crate_root = Path::new(env!("CARGO_MANIFEST_DIR")); - let target_dir = crate_root - .parent() - .expect("Failed to find crate parent") - .parent() - .expect("Failed to find workspace root") - .join("target"); - - // Find the location of the library. This is specific to the build environment, - // so you will need to change the approach here based on your use case. - // let target: &std::path::Path = "../../../../target/".as_ref(); - let library_path = - compute_library_path::(target_dir.as_path()) - .map_err(|e| DataFusionError::External(Box::new(e)))? - .join("deps"); - - // Load the module - let module = ForeignLibraryModuleRef::load_from_directory(&library_path) - .map_err(|e| DataFusionError::External(Box::new(e)))?; - - assert_eq!( - module - .version() - .expect("Unable to call version on FFI module")(), - expected_version - ); - - Ok(module) - } - /// It is important that this test is in the `tests` directory and not in the /// library directory so we can verify we are building a dynamic library and /// testing it via a different executable. @@ -141,46 +71,6 @@ mod tests { test_table_provider(true).await } - /// This test validates that we can load an external module and use a scalar - /// udf defined in it via the foreign function interface. In this case we are - /// using the abs() function as our scalar UDF. - #[tokio::test] - async fn test_scalar_udf() -> Result<()> { - let module = get_module()?; - - let ffi_abs_func = - module - .create_scalar_udf() - .ok_or(DataFusionError::NotImplemented( - "External table provider failed to implement create_scalar_udf" - .to_string(), - ))?(); - let foreign_abs_func: ForeignScalarUDF = (&ffi_abs_func).try_into()?; - - let udf: ScalarUDF = foreign_abs_func.into(); - - let ctx = SessionContext::default(); - let df = ctx.read_batch(create_record_batch(-5, 5))?; - - let df = df - .with_column("abs_a", udf.call(vec![col("a")]))? - .with_column("abs_b", udf.call(vec![col("b")]))?; - - let result = df.collect().await?; - - let expected = record_batch!( - ("a", Int32, vec![-5, -4, -3, -2, -1]), - ("b", Float64, vec![-5., -4., -3., -2., -1.]), - ("abs_a", Int32, vec![5, 4, 3, 2, 1]), - ("abs_b", Float64, vec![5., 4., 3., 2., 1.]) - )?; - - assert!(result.len() == 1); - assert!(result[0] == expected); - - Ok(()) - } - #[tokio::test] async fn test_catalog() -> Result<()> { let module = get_module()?; diff --git a/datafusion/ffi/tests/ffi_udf.rs b/datafusion/ffi/tests/ffi_udf.rs new file mode 100644 index 0000000000000..bbc23552def43 --- /dev/null +++ b/datafusion/ffi/tests/ffi_udf.rs @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Add an additional module here for convenience to scope this to only +/// when the feature integtation-tests is built +#[cfg(feature = "integration-tests")] +mod tests { + + use arrow::datatypes::DataType; + use datafusion::common::record_batch; + use datafusion::error::{DataFusionError, Result}; + use datafusion::logical_expr::ScalarUDF; + use datafusion::prelude::{col, SessionContext}; + + use datafusion_ffi::tests::create_record_batch; + use datafusion_ffi::tests::utils::get_module; + use datafusion_ffi::udf::ForeignScalarUDF; + + /// This test validates that we can load an external module and use a scalar + /// udf defined in it via the foreign function interface. In this case we are + /// using the abs() function as our scalar UDF. + #[tokio::test] + async fn test_scalar_udf() -> Result<()> { + let module = get_module()?; + + let ffi_abs_func = + module + .create_scalar_udf() + .ok_or(DataFusionError::NotImplemented( + "External table provider failed to implement create_scalar_udf" + .to_string(), + ))?(); + let foreign_abs_func: ForeignScalarUDF = (&ffi_abs_func).try_into()?; + + let udf: ScalarUDF = foreign_abs_func.into(); + + let ctx = SessionContext::default(); + let df = ctx.read_batch(create_record_batch(-5, 5))?; + + let df = df + .with_column("abs_a", udf.call(vec![col("a")]))? + .with_column("abs_b", udf.call(vec![col("b")]))?; + + let result = df.collect().await?; + + let expected = record_batch!( + ("a", Int32, vec![-5, -4, -3, -2, -1]), + ("b", Float64, vec![-5., -4., -3., -2., -1.]), + ("abs_a", Int32, vec![5, 4, 3, 2, 1]), + ("abs_b", Float64, vec![5., 4., 3., 2., 1.]) + )?; + + assert!(result.len() == 1); + assert!(result[0] == expected); + + Ok(()) + } + + /// This test validates nullary input UDFs + #[tokio::test] + async fn test_nullary_scalar_udf() -> Result<()> { + let module = get_module()?; + + let ffi_abs_func = + module + .create_nullary_udf() + .ok_or(DataFusionError::NotImplemented( + "External table provider failed to implement create_scalar_udf" + .to_string(), + ))?(); + let foreign_abs_func: ForeignScalarUDF = (&ffi_abs_func).try_into()?; + + let udf: ScalarUDF = foreign_abs_func.into(); + + let ctx = SessionContext::default(); + let df = ctx.read_batch(create_record_batch(-5, 5))?; + + let df = df.with_column("time_now", udf.call(vec![]))?; + + let result = df.collect().await?; + + assert!(result.len() == 1); + assert_eq!( + result[0].column_by_name("time_now").unwrap().data_type(), + &DataType::Float64 + ); + + Ok(()) + } +} From 853f42f2d071e071798d1e95c079de9ce7e50fd9 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 29 Mar 2025 09:32:55 -0400 Subject: [PATCH 2/6] Add license text --- datafusion/ffi/src/tests/utils.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion/ffi/src/tests/utils.rs b/datafusion/ffi/src/tests/utils.rs index f01f5253126a9..6465b17d9b60c 100644 --- a/datafusion/ffi/src/tests/utils.rs +++ b/datafusion/ffi/src/tests/utils.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use crate::tests::ForeignLibraryModuleRef; use abi_stable::library::RootModule; use datafusion::error::{DataFusionError, Result}; From 009b77b95aeae64525fff12c64724aa6166970c6 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 29 Mar 2025 10:06:53 -0400 Subject: [PATCH 3/6] Correct error in documentation --- datafusion/ffi/src/udf/return_info.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/ffi/src/udf/return_info.rs b/datafusion/ffi/src/udf/return_info.rs index 27086088c5d6b..cf76ddd1db762 100644 --- a/datafusion/ffi/src/udf/return_info.rs +++ b/datafusion/ffi/src/udf/return_info.rs @@ -21,7 +21,7 @@ use datafusion::{error::DataFusionError, logical_expr::ReturnInfo}; use crate::arrow_wrappers::WrappedSchema; -/// A stable struct for sharing a [`ReturnTypeArgs`] across FFI boundaries. +/// A stable struct for sharing a [`ReturnInfo`] across FFI boundaries. #[repr(C)] #[derive(Debug, StableAbi)] #[allow(non_camel_case_types)] From 1a6707b78a9bc5624706a0d7b1ec6fdb18eb5b99 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 29 Mar 2025 10:07:20 -0400 Subject: [PATCH 4/6] Error message changed in test due to updated scalar coercion --- datafusion/sqllogictest/test_files/functions.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/functions.slt b/datafusion/sqllogictest/test_files/functions.slt index de1dbf74c29bf..20f79622a62c6 100644 --- a/datafusion/sqllogictest/test_files/functions.slt +++ b/datafusion/sqllogictest/test_files/functions.slt @@ -858,7 +858,7 @@ SELECT greatest(-1, 1, 2.3, 123456789, 3 + 5, -(-4), abs(-9.0)) 123456789 -query error 'greatest' does not support zero argument +query error Function 'greatest' user-defined coercion failed with "Error during planning: greatest was called without any arguments. It requires at least 1." SELECT greatest() query I @@ -1056,7 +1056,7 @@ SELECT least(-1, 1, 2.3, 123456789, 3 + 5, -(-4), abs(-9.0)) -1 -query error 'least' does not support zero arguments +query error Function 'least' user-defined coercion failed with "Error during planning: least was called without any arguments. It requires at least 1." SELECT least() query I From f3807f94793f22a7d3769ec759892912e30eb793 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 29 Mar 2025 10:09:30 -0400 Subject: [PATCH 5/6] Perform check of user defined types when looking for empty argument types --- datafusion/expr/src/type_coercion/functions.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index a2051c55999dc..3b34718062eb4 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -87,7 +87,7 @@ pub fn data_types_with_aggregate_udf( let signature = func.signature(); let type_signature = &signature.type_signature; - if current_types.is_empty() { + if current_types.is_empty() && type_signature != &TypeSignature::UserDefined { if type_signature.supports_zero_argument() { return Ok(vec![]); } else if type_signature.used_to_support_zero_arguments() { @@ -124,7 +124,7 @@ pub fn data_types_with_window_udf( let signature = func.signature(); let type_signature = &signature.type_signature; - if current_types.is_empty() { + if current_types.is_empty() && type_signature != &TypeSignature::UserDefined { if type_signature.supports_zero_argument() { return Ok(vec![]); } else if type_signature.used_to_support_zero_arguments() { @@ -161,7 +161,7 @@ pub fn data_types( ) -> Result> { let type_signature = &signature.type_signature; - if current_types.is_empty() { + if current_types.is_empty() && type_signature != &TypeSignature::UserDefined { if type_signature.supports_zero_argument() { return Ok(vec![]); } else if type_signature.used_to_support_zero_arguments() { From 158b51040489bcaea6322267b50e76fa8cbd147c Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 29 Mar 2025 10:31:31 -0400 Subject: [PATCH 6/6] Updated error messages during unit test --- datafusion/sql/tests/sql_integration.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 866c08ed0257e..9b8f949456598 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4094,7 +4094,7 @@ fn test_error_message_invalid_scalar_function_signature() { fn test_error_message_invalid_aggregate_function_signature() { error_message_test( "select sum()", - "Error during planning: 'sum' does not support zero arguments", + "Error during planning: Execution error: Function 'sum' user-defined coercion failed with \"Execution error: sum function requires 1 argument, got 0\"", ); // We keep two different prefixes because they clarify each other. // It might be incorrect, and we should consider keeping only one. @@ -4116,7 +4116,7 @@ fn test_error_message_invalid_window_function_signature() { fn test_error_message_invalid_window_aggregate_function_signature() { error_message_test( "select sum() over()", - "Error during planning: 'sum' does not support zero arguments", + "Error during planning: Execution error: Function 'sum' user-defined coercion failed with \"Execution error: sum function requires 1 argument, got 0\"", ); }