diff --git a/datafusion-examples/examples/async_udf.rs b/datafusion-examples/examples/async_udf.rs index 22e759de40f7a..59d89cb744d1c 100644 --- a/datafusion-examples/examples/async_udf.rs +++ b/datafusion-examples/examples/async_udf.rs @@ -194,7 +194,10 @@ impl AsyncScalarUDFImpl for AskLLM { /// is processing the query, so you may wish to make actual network requests /// on a different `Runtime`, as explained in the `thread_pools.rs` example /// in this directory. - async fn invoke_async_with_args(&self, args: ScalarFunctionArgs) -> Result { + async fn invoke_async_with_args( + &self, + args: ScalarFunctionArgs, + ) -> Result { // in a real UDF you would likely want to special case constant // arguments to improve performance, but this example converts the // arguments to arrays for simplicity. @@ -229,6 +232,6 @@ impl AsyncScalarUDFImpl for AskLLM { }) .collect(); - Ok(Arc::new(result_array)) + Ok(ColumnarValue::from(Arc::new(result_array) as ArrayRef)) } } diff --git a/datafusion/expr/src/async_udf.rs b/datafusion/expr/src/async_udf.rs index a62d4d5341f08..811476dfd995d 100644 --- a/datafusion/expr/src/async_udf.rs +++ b/datafusion/expr/src/async_udf.rs @@ -19,7 +19,6 @@ use crate::utils::{arc_ptr_eq, arc_ptr_hash}; use crate::{ udf_equals_hash, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, }; -use arrow::array::ArrayRef; use arrow::datatypes::{DataType, FieldRef}; use async_trait::async_trait; use datafusion_common::error::Result; @@ -48,7 +47,10 @@ pub trait AsyncScalarUDFImpl: ScalarUDFImpl { } /// Invoke the function asynchronously with the async arguments - async fn invoke_async_with_args(&self, args: ScalarFunctionArgs) -> Result; + async fn invoke_async_with_args( + &self, + args: ScalarFunctionArgs, + ) -> Result; } /// A scalar UDF that must be invoked using async methods @@ -95,7 +97,7 @@ impl AsyncScalarUDF { pub async fn invoke_async_with_args( &self, args: ScalarFunctionArgs, - ) -> Result { + ) -> Result { self.inner.invoke_async_with_args(args).await } } diff --git a/datafusion/physical-expr/src/async_scalar_function.rs b/datafusion/physical-expr/src/async_scalar_function.rs index 00134565ea443..b434694a20cc8 100644 --- a/datafusion/physical-expr/src/async_scalar_function.rs +++ b/datafusion/physical-expr/src/async_scalar_function.rs @@ -192,7 +192,7 @@ impl AsyncFuncExpr { ); } - let datas = result_batches + let datas = ColumnarValue::values_to_arrays(&result_batches)? .iter() .map(|b| b.to_data()) .collect::>(); diff --git a/docs/source/library-user-guide/functions/adding-udfs.md b/docs/source/library-user-guide/functions/adding-udfs.md index da9b6e37a6445..e702fb4c4ebaa 100644 --- a/docs/source/library-user-guide/functions/adding-udfs.md +++ b/docs/source/library-user-guide/functions/adding-udfs.md @@ -444,12 +444,11 @@ impl AsyncScalarUDFImpl for AsyncUpper { } /// This method is called to execute the async UDF and is similar - /// to the normal `invoke_with_args` except it returns an `ArrayRef` - /// instead of `ColumnarValue` and is `async`. + /// to the normal `invoke_with_args` except it is `async`. async fn invoke_async_with_args( &self, args: ScalarFunctionArgs, - ) -> Result { + ) -> Result { let value = &args.args[0]; // This function simply implements a simple string to uppercase conversion // but can be used for any async operation such as network calls. @@ -464,7 +463,7 @@ impl AsyncScalarUDFImpl for AsyncUpper { } _ => return internal_err!("Expected a string argument, got {:?}", value), }; - Ok(result) + Ok(ColumnarValue::from(result)) } } ``` @@ -548,7 +547,7 @@ We can now transfer the async UDF into the normal scalar using `into_scalar_udf` # async fn invoke_async_with_args( # &self, # args: ScalarFunctionArgs, -# ) -> Result { +# ) -> Result { # trace!("Invoking async_upper with args: {:?}", args); # let value = &args.args[0]; # let result = match value { @@ -562,7 +561,7 @@ We can now transfer the async UDF into the normal scalar using `into_scalar_udf` # } # _ => return internal_err!("Expected a string argument, got {:?}", value), # }; -# Ok(result) +# Ok(ColumnarValue::from(result)) # } # } use datafusion::execution::context::SessionContext; diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index eece034ce33e9..e18b6682b8e98 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -24,6 +24,48 @@ **Note:** DataFusion `50.0.0` has not been released yet. The information provided in this section pertains to features and changes that have already been merged to the main branch and are awaiting release in this version. You can see the current [status of the `50.0.0 `release here](https://github.com/apache/datafusion/issues/16799) +### `AsyncScalarUDFImpl::invoke_async_with_args` returns `ColumnarValue` + +In order to enable single value optimizations and be consistent with other +user defined function APIs, the `AsyncScalarUDFImpl::invoke_async_with_args` method now +returns a `ColumnarValue` instead of a `ArrayRef`. + +To upgrade, change the return type of your implementation + +```rust +# /* comment to avoid running +impl AsyncScalarUDFImpl for AskLLM { + async fn invoke_async_with_args( + &self, + args: ScalarFunctionArgs, + _option: &ConfigOptions, + ) -> Result { + .. + return array_ref; // old code + } +} +# */ +``` + +To return a `ColumnarValue` + +```rust +# /* comment to avoid running +impl AsyncScalarUDFImpl for AskLLM { + async fn invoke_async_with_args( + &self, + args: ScalarFunctionArgs, + _option: &ConfigOptions, + ) -> Result { + .. + return ColumnarValue::from(array_ref); // new code + } +} +# */ +``` + +See [#16896](https://github.com/apache/datafusion/issues/16896) for more details. + ### `SessionState`, `SessionConfig`, and `OptimizerConfig` returns `&Arc` instead of `&ConfigOptions` To provide broader access to `ConfigOptions` and reduce required clones, some