Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Mar 28, 2020

Support for Scalar UDFs, allowing custom Rust code to run as an expression. Scalar UDFs are supported both in SQL and in plans built via LogicalPlanBuilder.

This will allow users of DataFusion to add their own expressions and also provides a framework to start adding useful expressions to DataFusion.

The following unary math expressions are implemented as a starting point:

    ctx.register_udf(math_unary_function!("sqrt", sqrt));
    ctx.register_udf(math_unary_function!("sin", sin));
    ctx.register_udf(math_unary_function!("cos", cos));
    ctx.register_udf(math_unary_function!("tan", tan));
    ctx.register_udf(math_unary_function!("asin", asin));
    ctx.register_udf(math_unary_function!("acos", acos));
    ctx.register_udf(math_unary_function!("atan", atan));
    ctx.register_udf(math_unary_function!("floor", floor));
    ctx.register_udf(math_unary_function!("ceil", ceil));
    ctx.register_udf(math_unary_function!("round", round));
    ctx.register_udf(math_unary_function!("trunc", trunc));
    ctx.register_udf(math_unary_function!("abs", abs));
    ctx.register_udf(math_unary_function!("signum", signum));
    ctx.register_udf(math_unary_function!("exp", exp));
    ctx.register_udf(math_unary_function!("log", ln));
    ctx.register_udf(math_unary_function!("log2", log2));
    ctx.register_udf(math_unary_function!("log10", log10));

Macros are used to generate convenience methods for creating these expressions in a logical plan, so it is now possible to write something like:

let plan = LogicalPlanBuilder::scan("", "", &schema, None)?
    .project(vec![sqrt(col("a")), log(col("b"))])?
    .build()?;

@github-actions
Copy link

@andygrove
Copy link
Member Author

@kyle-mccarthy @jorgecarleitao FYI

@andygrove andygrove changed the title ARROW-6947: [Rust] [DataFusion] Scalar UDF support [WIP] ARROW-6947: [Rust] [DataFusion] Scalar UDF support Mar 28, 2020
@andygrove andygrove marked this pull request as ready for review March 28, 2020 16:26
@jorgecarleitao
Copy link
Member

The code itself looks really good. My only concern is that the API for registering the UDF has a lot of boilerplate code. Per test, it requires the following amount of code to register a simple sqrt function:

    let sqrt: ScalarUdf = |args: &Vec<ArrayRef>| {
        let input = &args[0]
            .as_any()
            .downcast_ref::<Float64Array>()
            .expect("cast failed");

        let mut builder = Float64Builder::new(input.len());
        for i in 0..input.len() {
            builder.append_value(input.value(i).sqrt())?;
        }
        Ok(Arc::new(builder.finish()))
    };

    let sqrt_meta = ScalarFunction::new(
        "sqrt",
        vec![Field::new("n", DataType::Float64, true)],
        DataType::Float64,
        sqrt,
    );

IMO this is too much: 20 LOC, with an Arc, downcast, a Builder, and ensuring that the schema matches the type of the downcast and Builder, just for a simple scalar operation.

What if we provide a macro to simplify this declaration, e.g.

udf!(Float64, Float64, |x| x.sqrt())

such that it is the macro's responsibility to pick the downcast type and builder?

@andygrove
Copy link
Member Author

Yes, I think that would be the next logical step here now that there is a mechanism for executing scalar functions. Many of the math expressions (sqrt, sin, cos, tan, etc) are going to be very similar so it would make sense to use macros.

I think this could be a good follow up JIRA & PR.

@andygrove
Copy link
Member Author

@paddyhoran @nevi-me PTAL if you have the time.

@andygrove
Copy link
Member Author

@jorgecarleitao Thanks for the suggestion, please take another look if you have time.

@jorgecarleitao
Copy link
Member

I went through it. Super simple to add a float64 function for us now. I also like a lot what you brought about adding stateful. From my side, I think that this is a significant improvement and should be merged.


I am sorry that I was not clear in my previous comment: my argument is that for the developers using this library - our users - the API to declare and register a new UDF is cumbersome.

Compare the 26 LOC with spark's counterpart:

// Define a regular Scala function
val upper: String => String = _.toUpperCase

// Define a UDF that wraps the upper Scala function defined above
import org.apache.spark.sql.functions.udf
val upperUDF = udf(upper)

I understand that this is not scala and stuff, but I do believe that the boilerplate code can be mitigated by exposing, as part of our (public) API, a macro that can help our users declaring UDFs.

My hypothesis is that we should be able to write a macro as follows:

scalarUDF!(&str, &dyn Fn(a: in_type) -> out_type)

so that the user can write

let udf = scalarUDF!("sqrt", |x: f32| -> sqrt(x) as f16)
ctx.register_udf(udf);

and the macro figures itself what is the builder and downcasts should be. A common use-case is string manipulation.

IMO this would dramatically improve the user experience of adding a UDF, since the user could now focus exclusively on the functionality itself. Of course there are optimizations that are possible only when we write the full implementation (e.g. use SIMD or native ops), but for the average data engineer wanting to use Rust and DataFusion, I think that reducing the barrier to a UDF significantly helps DataFusion's adoption, since UDFs is a feature that makes spark so much useful when compared to the alternatives.

ScalarFunction::new(
$NAME,
vec![Field::new("n", DataType::Float64, true)],
DataType::Float64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great, I'll try and do a proper review soon. Are there any considerations we need to make here for the fact that math_unary_function assumes f64, will this panic on f32 input?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great point. We need the type coercion optimizer rule to take care of this by automatically casting expressions to the required type where possible or failing at that stage if types are not compatible. I will work on this today.

@andygrove
Copy link
Member Author

@jorgecarleitao I see what you mean now. Yes, that's a great point. I will do that either as part of this PR or a follow-up PR.

@andygrove
Copy link
Member Author

@jorgecarleitao I filed https://issues.apache.org/jira/browse/ARROW-8253

@andygrove
Copy link
Member Author

@paddyhoran I suppose it would make sense to move the implementation of these functions out of DataFusion and into Arrow since they could be used directly there?

@paddyhoran
Copy link
Contributor

Yea, I was thinking the same thing but didn't want to hold this PR up. I also think that arrow could benefit from having a sub-module in compute that deals with RecordBatch objects as a lot of crates that build on top will use that abstraction.

@nevi-me
Copy link
Contributor

nevi-me commented Apr 4, 2020

Yea, I was thinking the same thing but didn't want to hold this PR up. I also think that arrow could benefit from having a sub-module in compute that deals with RecordBatch objects as a lot of crates that build on top will use that abstraction.

There's 2 other alternatives which are similar.

  1. We could adopt the CPP/Python implementation's ChunkedArray, which is a Vec<Arc<dyn Array>> like the below. Compute functions that currently take arrays could take a chunked array, where for convenience an Array could be represented as a ChunkedArray with 1 chunk.
// this is what I'm doing in the rust-dataframe library
#[derive(Clone)]
pub struct ChunkedArray {
    chunks: Vec<Arc<dyn Array>>,
    num_rows: usize,
    null_count: usize,
}
  1. We could adopt the Datum from CPP, which IIRC behaves like an enum of:
pub enum Datum {
  Scalar(some_value), // haven't thought of how this would look like
  Array(Arc<dyn Array>),
  Chunk(ChunkedArray),
  ...
}

This would help DataFusion and other compute users handle literal values and (array ~ scalar) better by avoiding creating an array out of scalars.

@andygrove
Copy link
Member Author

@nevi-me @paddyhoran It's time to release 0.17 ... are you OK with merging this one and we can follow up with another PR to move the math expressions to Arrow itself? The main reason I'd like to merge this PR for the release is that it enables DataFusion users to add their own UDFs.

Copy link
Contributor

@paddyhoran paddyhoran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I'm fine with merging this and following up on the other points when we come to a consensus.

@andygrove
Copy link
Member Author

Thanks @paddyhoran

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants