Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions rust/datafusion/examples/simple_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,28 @@ fn main() -> Result<()> {
pow,
);

// finally, register the UDF
ctx.register_udf(pow);
// at this point, we can use it or register it, depending on the use-case:
// * if the UDF is expected to be used throughout the program in different contexts,
// we can register it, and call it later:
ctx.register_udf(pow.clone()); // clone is only required in this example because we show both usages

// at this point, we can use it. Note that the code below can be in a
// scope on which we do not have access to `pow`.
// * if the UDF is expected to be used directly in the scope, `.call` it directly:
let expr = pow.call(vec![col("a"), col("b")]);

// get a DataFrame from the context
let df = ctx.table("t")?;

// get the udf registry.
let f = df.registry();

// equivalent to `'SELECT pow(a, b) FROM t'`
let df = df.select(vec![f.udf("pow", vec![col("a"), col("b")])?])?;
// if we do not have `pow` in the scope and we registered it, we can get it from the registry
let pow = df.registry().udf("pow")?;
// equivalent to expr
let expr1 = pow.call(vec![col("a"), col("b")]);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert_eq!(expr, expr1);

// equivalent to `'SELECT pow(a, b), pow(a, b) AS pow1 FROM t'`
let df = df.select(vec![
expr,
// alias so that they have different column names
expr1.alias("pow1"),
])?;

// note that "b" is f32, not f64. DataFusion coerces the types to match the UDF's signature.

Expand Down
11 changes: 4 additions & 7 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::datasource::parquet::ParquetTable;
use crate::datasource::TableProvider;
use crate::error::{ExecutionError, Result};
use crate::execution::dataframe_impl::DataFrameImpl;
use crate::logical_plan::{Expr, FunctionRegistry, LogicalPlan, LogicalPlanBuilder};
use crate::logical_plan::{FunctionRegistry, LogicalPlan, LogicalPlanBuilder};
use crate::optimizer::filter_push_down::FilterPushDown;
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::projection_push_down::ProjectionPushDown;
Expand Down Expand Up @@ -481,17 +481,14 @@ impl FunctionRegistry for ExecutionContextState {
self.scalar_functions.keys().cloned().collect()
}

fn udf(&self, name: &str, args: Vec<Expr>) -> Result<Expr> {
fn udf(&self, name: &str) -> Result<&ScalarUDF> {
let result = self.scalar_functions.get(name);
if result.is_none() {
Err(ExecutionError::General(
format!("There is no UDF named \"{}\" in the registry", name).to_string(),
))
} else {
Ok(Expr::ScalarUDF {
fun: result.unwrap().clone(),
args,
})
Ok(result.unwrap())
}
}
}
Expand Down Expand Up @@ -1051,7 +1048,7 @@ mod tests {
.project(vec![
col("a"),
col("b"),
ctx.registry().udf("my_add", vec![col("a"), col("b")])?,
ctx.registry().udf("my_add")?.call(vec![col("a"), col("b")]),
])?
.build()?;

Expand Down
2 changes: 1 addition & 1 deletion rust/datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ mod tests {

let f = df.registry();

let df = df.select(vec![f.udf("my_fn", vec![col("c12")])?])?;
let df = df.select(vec![f.udf("my_fn")?.call(vec![col("c12")])])?;
let plan = df.to_logical_plan();

// build query using SQL
Expand Down
4 changes: 2 additions & 2 deletions rust/datafusion/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1133,8 +1133,8 @@ pub trait FunctionRegistry {
/// Set of all available udfs.
fn udfs(&self) -> HashSet<String>;

/// Constructs a logical expression with a call to the udf.
fn udf(&self, name: &str, args: Vec<Expr>) -> Result<Expr>;
/// Returns a reference to the udf named `name`.
fn udf(&self, name: &str) -> Result<&ScalarUDF>;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

/// Builder for logical plans
Expand Down
11 changes: 10 additions & 1 deletion rust/datafusion/src/physical_plan/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::fmt;
use arrow::datatypes::Schema;

use crate::error::Result;
use crate::physical_plan::PhysicalExpr;
use crate::{logical_plan::Expr, physical_plan::PhysicalExpr};

use super::{
functions::{
Expand Down Expand Up @@ -71,6 +71,15 @@ impl ScalarUDF {
fun: fun.clone(),
}
}

/// creates a logical expression with a call of the UDF
/// This utility allows using the UDF without requiring access to the registry.
pub fn call(&self, args: Vec<Expr>) -> Expr {
Expr::ScalarUDF {
fun: Arc::new(self.clone()),
args,
}
}
}

/// Create a physical expression of the UDF.
Expand Down