Skip to content

Conversation

@jorgecarleitao
Copy link
Member

@jorgecarleitao jorgecarleitao commented Jan 30, 2021

This PR adds support for ScalarValue to all builtin-functions and UDFs from DataFusion.

This allows to execute builtin functions without having to create arrays and pass then to the kernels.

With this change, coding a new kernel and UDFs is a more time consuming, as it is necessary to cater for both the scalar and array case. OTOH, this leads to a major performance improvement as we only need to perform 1 operation instead of the usual number of rows operations.

@jorgecarleitao
Copy link
Member Author

@seddonm1
Copy link
Contributor

@jorgecarleitao yes this looks good 👍

This is going to take some time to apply but I like the idea of passing a function to apply to the string (for example). It would be good to get this merged soon so I can apply to the big Postgres functions PR.

@jorgecarleitao
Copy link
Member Author

jorgecarleitao commented Jan 31, 2021

@alamb , when you wrote the date_trunc, did you had in mind having different granularities per row, or was it driven by the fact the the builtin functions only accepted arrays?

If the latter, do you think it would be ok to require granularity to always be a scalar as a regression / feature of this PR?

I am asking because date_trunc(array | scalar, array | scalar) is a "bit" more effort than date_trunc(scalar, array | scalar).

@Dandandan
Copy link
Contributor

FWIW, PostgreSQL also doesn't support the non-scalar granularity for date_trunc

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like where this is headed @jorgecarleitao ❤️ I skimmed it quickly but will be happy to review it thoroughly when it is ready

Another thing this PR's functionality is likely to enable is more general 'constant folding' -- e.g the kind of thing @houqp is doing for booleans in #9309

Imagine an expression like A < (5 + 1) -- using this code we could evalaute the 5+1 into 6 at PLAN time rather than during execution. A more likely useful example is date_truc(date_col, 'month') < date_trun(2021-01-31, 'month') and other similar things

@alamb
Copy link
Contributor

alamb commented Jan 31, 2021

@jorgecarleitao -- I agree with @Dandandan -- think date_trunc(scalar, array) and date_trun(scalar, scalar) are the really important cases. I have no use case for date_trunc(array, array)`

@jorgecarleitao jorgecarleitao changed the title WIP: [DataFusion] Added support for scalarValue in Builtin functions. ARROW-11446: [DataFusion] Added support for scalarValue in Builtin functions. Jan 31, 2021
@github-actions
Copy link

@jorgecarleitao jorgecarleitao marked this pull request as ready for review January 31, 2021 16:08
@jorgecarleitao
Copy link
Member Author

This is ready for a first take.

Some notes:

  • This changes the UDF's API: they should now handle ColumnarValue, not ArrayRef.
  • whenever possible, I used generics instead of macros. I have been reasoning about code more easily with generics because they set the traits explicitly and are therefore IMO easier to use.
  • I had to add a new trait to bridge ScalarValue and ArrowPrimitiveType
  • I had to implement the TryFrom for the timestampNanoseconds.
  • I have not added test coverage to the scalar cases of our existing functions. Longer term, I think we should develop a generic to handle this without having to test the two cases. I tried to do something like this, but I still had to copy-paste some code between generics, unfortunately.

Sorry for the long PR, but I did not find an easy way to split it up (besides #9378, which this PR is built on top of).

@seddonm1
Copy link
Contributor

@jorgecarleitao -- I agree with @Dandandan -- think date_trunc(scalar, array) and date_trun(scalar, scalar) are the really important cases. I have no use case for date_trunc(array, array)`

Having been trying to implement a lot of the Postgres logic recently they all logically support the use of arrays for each of the parameters (i.e. date_trunc(array, array)) I have no idea how common the use of this functionality really is - and I suspect it is low - but if we are building the API for Postgres compatibility it would be good to solve.

@apache apache deleted a comment from github-actions bot Feb 1, 2021
@alamb
Copy link
Contributor

alamb commented Feb 1, 2021

Having been trying to implement a lot of the Postgres logic recently they all logically support the use of arrays for each of the parameters (i.e. date_trunc(array, array)) I have no idea how common the use of this functionality really is - and I suspect it is low - but if we are building the API for Postgres compatibility it would be good to solve.

This is a good point @seddonm1 - I am just starting to look at this pR, but I wonder if we could do something like have a default implementation for func(scalar, scalar), func(scalar, array), and func(array, scalar) that are implemented in terms of the func(array, array)

that way we would only have to supply func(array, array) and the scalar input case could be handled on a case by case basis. I'll try and commend inline if I see how that might be done

@seddonm1
Copy link
Contributor

seddonm1 commented Feb 1, 2021

@alamb here is a proof-of-concept for the regexp_replace Postgres function which has been built to support the possibility of passing in different function parameters for each row. If it were possible to tell whether the value was a Scalar or Array there would be major optimisation opportunities. I did some basic memoization of the Regex objects but that would not be as necessary if we knew Scalar v Array.

https://github.com/apache/arrow/pull/9243/files#diff-abe8768fe7124198cca7a84ad7b2c678b3cc8e5de3d1bc867d498536a2fdddc7R542

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I wrote up a big comment that github seems to have lost :(

Basically I spent 30 minutes studying this PR. I think it is one of the most important PRs for expression evaluation I have seen in DataFusion.

My major concern (other than the annoyance of the backward compatibility) is that it might put the implementation of built in functions even further out of reach for new contributors. The cognitive load of understanding ArrayRef in general and working with them is already large, and having to handle ColumnValues will just make it more so.

Also this PR is too large for me to effectively review -- even with some more time (which I am not sure I can find) I am not sure I could fully grok it.

I wonder if we could do something like the following to both ease new developers as well as allow this PR to be implemented in chunks.

Make a new type that has the old array signature:

pub type SimpleScalarFunctionImplementation =
    Arc<dyn Fn(&[ArrayRef]) -> Result<ArrayRef> + Send + Sync>;

And then make an adapter function that takes a simple signature and implements the more general one:

fn make_scalar_function(inner: SimpleScalarFunctionImplementation) -> ScalarFunctionImplementation {
   // invoke inner correctly, creating `ArrayRef` to hold a bunch of scalar values
  }

That way you could keep most signatures the same and then implement the general cases one by one.

Thanks again @jorgecarleitao -- this is really awesome work,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// `Ok` because no error occurred during the calculation (we should add one if exponent was [0, 1[ and the base < 0 because that panics!)
// `Ok` because no error occurred during the calculation (we should add one if exponent was [0, 1] and the base < 0 because that panics!)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This represents an open interval: 0 <= exponent < 1 because (x)^1 = x for all x (including negative ones).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool -- sorry I guess I am used to seeing an open interval using a ) --so in this case something like [0, 1) to represent 0 <= exponent < 1 (e.g. here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding an example of using this pattern for implementing UDfs might be really helpful

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing it -- but this function only seems to be invoked once with the same set of type arguments -- does it need to be generic? Or more broadly, can we hoist this seemingly repeated handle pattern somewhere it can be reused (and reduce the cognitive load on people writing new functions?)

Copy link
Member Author

@jorgecarleitao jorgecarleitao Feb 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pattern depends on the input and output of the function. I.e. when input is &str, then Utf8/LargeUtf8. When output is a Native, then the output is PrimitiveArray<O::Native>. In general this construct depends on the what the user is trying to achieve (wrt to input and output types).

I placed this here because it allows to decouple the pattern (of handling Scalar and Array) from the implementation of the logic (string_to_timestamp_nanos in this case).

In crypto_expressions we have a similar pattern, but in this case the function is &str -> AsRef<[u8]>, which allowed to write all cripto sha in a succinct manner. However, in that case, the output type is always Binary instead of LargeBinary for LargeStringArray, because the hashes are always smaller than i32::MAX. All of this was already written, I just expanded it for the two variants (scalar and vector).

Note that crypto_expressions::handle and crypto_expressions::md5 are very similar, but their return types are different: handle receives a GenericStringArray, but returns a BinaryArray. This is because MD5's signature is string -> string, while sha is string -> binary.

Copy link
Contributor

@alamb alamb Feb 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense -- thank you for the explanation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function is effectively applying a str->str function op to all values in args? Some comments might be helpful

@jorgecarleitao
Copy link
Member Author

Hi @alamb , thanks a lot for your review and comments. I generally agree that this makes it more difficult to write a UDF and a function implementation in general.

I like that idea. I have now pushed a commit with it. I will now address the remaining ones.

@jorgecarleitao
Copy link
Member Author

Ok, I have addressed the comments.

The API change to UDFs is that people need to call make_scalar_function on their existing UDFs, as seen in diff of the example.

Out of curiosity, did anyone run the benchmarks? I do not have a machine suitable for that, but I am obviously curious :)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is looking good now @jorgecarleitao -- thank you 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool -- sorry I guess I am used to seeing an open interval using a ) --so in this case something like [0, 1) to represent 0 <= exponent < 1 (e.g. here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@Dandandan
Copy link
Contributor

@jorgecarleitao what kind of benchmarks are you interested in? AFAIK, most benchmarks are not very depending on this, I expect it being mostly impactfully in cases where the projection itself is a lot of the time, but most benchmarks are spending most of the time on joins/aggregates.

@alamb
Copy link
Contributor

alamb commented Feb 6, 2021

@andygrove / @Dandandan / @seddonm1 / @maxburke / @jhorstmann -- what are your thoughts on this one?

It is a significant enough change I think someone using DataFusion in their projects, other than myself should weigh in. I like it a lot -- and I think it could serve as the basis for constant folding (e.g. transform A + (5 + 7) --> A + 12 at plan time)

@Dandandan
Copy link
Contributor

I think changes here look good, and will be helpful to implement functions which typically operate on (scalar, array). And this will help to make those cases (quite a bit) more efficient. I think that can be particularly helpful when having complex expressions in WHERE clauses / projections.

@alamb I am not sure I understand why folding should be done on the physical plan level or really depending on this PR , it should be possible without changes in this PR (just by having some Expr -> Expr rules)? They could share the same evaluation code though?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let result = (inner)(&args);
let result = inner(&args);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let result = a.as_ref().map(|x| (op)(x).as_ref().to_vec());
let result = a.as_ref().map(|x| op(x).as_ref().to_vec());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let result = a.as_ref().map(|x| (op)(x).as_ref().to_vec());
let result = a.as_ref().map(|x| op(x).as_ref().to_vec());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let result = a.as_ref().map(|x| md5_process(x));
let result = a.as_ref().map(md5_process);

@alamb
Copy link
Contributor

alamb commented Feb 6, 2021

@alamb I am not sure I understand why folding should be done on the physical plan level or really depending on this PR , it should be possible without changes in this PR (just by having some Expr -> Expr rules)? They could share the same evaluation code though?

@Dandandan -- yes, I think constant folding looks like Expr --> Expr. But as you hint at, if the physical evaluation of functions is entirely separate from the plan time evaluation, we will end up with two parallel implementations for evaluation that need to be kept in sync (one for Expr and one for the physical runtime) -- using the same implementation for both I think avoids a lot of potential inconsistency (and source of bugs)

We'll see if that turns out to be possible, but I think it should be

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Ok(array.iter().map(|x| x.map(|x| op(x))).collect())
Ok(array.iter().map(|x| x.map(op).collect())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let result = a.as_ref().map(|x| md5_process(x));
let result = a.as_ref().map(md5_process);

@maxburke
Copy link
Contributor

maxburke commented Feb 6, 2021

@andygrove / @Dandandan / @seddonm1 / @maxburke / @jhorstmann -- what are your thoughts on this one?

It is a significant enough change I think someone using DataFusion in their projects, other than myself should weigh in. I like it a lot -- and I think it could serve as the basis for constant folding (e.g. transform A + (5 + 7) --> A + 12 at plan time)

On the surface this looks like it'll be a fantastic change to have; I am curious to see what the measured impact on query times will be.

(cc @velvia / @mcassels)

@jorgecarleitao
Copy link
Member Author

Rebased. 💦

@jorgecarleitao
Copy link
Member Author

This is now ready to merge. This will collide with an also large PR, #9243. We have a function to enable compatibility, but it is still some work :(

@alamb
Copy link
Contributor

alamb commented Feb 12, 2021

@seddonm1 -- what do you think about merge order of this PR and #9243 ? (which will conflict)

@seddonm1
Copy link
Contributor

Unfortunately (for me) this logically does go first as being able to identify ScalarValue would give a huge performance advantage.

I am happy to rework the other one after this is merged.

@alamb
Copy link
Contributor

alamb commented Feb 12, 2021

Sounds good -- @jorgecarleitao let's get it merged ! It looks like it needs another rebase and then we'll get it in

@alamb alamb added the needs-rebase A PR that needs to be rebased by the author label Feb 12, 2021
@codecov-io
Copy link

codecov-io commented Feb 12, 2021

Codecov Report

Merging #9376 (27b01cf) into master (88e9eb8) will decrease coverage by 0.16%.
The diff coverage is 64.97%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #9376      +/-   ##
==========================================
- Coverage   82.27%   82.11%   -0.17%     
==========================================
  Files         234      235       +1     
  Lines       54594    54714     +120     
==========================================
+ Hits        44919    44929      +10     
- Misses       9675     9785     +110     
Impacted Files Coverage Δ
rust/datafusion/src/execution/dataframe_impl.rs 93.83% <ø> (ø)
...datafusion/src/physical_plan/expressions/binary.rs 85.53% <ø> (-0.32%) ⬇️
...st/datafusion/src/physical_plan/expressions/mod.rs 71.42% <ø> (-18.58%) ⬇️
rust/datafusion/src/physical_plan/mod.rs 86.00% <ø> (ø)
rust/datafusion/src/scalar.rs 51.63% <19.14%> (-5.11%) ⬇️
...datafusion/src/physical_plan/crypto_expressions.rs 52.45% <43.13%> (-22.55%) ⬇️
...tafusion/src/physical_plan/datetime_expressions.rs 69.17% <64.60%> (-24.12%) ⬇️
...datafusion/src/physical_plan/string_expressions.rs 67.12% <68.57%> (-19.84%) ⬇️
.../datafusion/src/physical_plan/array_expressions.rs 43.33% <77.77%> (+11.51%) ⬆️
...datafusion/src/physical_plan/expressions/nullif.rs 86.88% <86.88%> (ø)
... and 8 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 88e9eb8...27b01cf. Read the comment docs.

@velvia
Copy link
Contributor

velvia commented Feb 13, 2021

Commenting since @maxburke pinged me.

On the surface I think this is a great change from the performance perspective. I totally agree that being able to deal with scalars instead of just arrays adds huge room for optimization. I have always thought that always needing intermediate arrays slowed down the processing of DataFusion significantly, for certain cases, and is also cache unfriendly.

On the other hand, I hear what @alamb and others are saying that it adds complexity to what is already nontrivial. I agree with that.

I wonder if it is possible to get the best of both worlds, by extending the Array trait slightly and having a subclass of Array which denotes scalars, like ScalarArray which do not need Buffer storage and just represents constant scalars. This way, functions would only need to deal with Array, but can recognize this subclass ScalarArray and do optimizations that way. This is a very half-formed thought at the moment. The train of thought is just what if the Array was not strictly a buffer-based representation but just a way to access columnar data, and in certain cases represents scalars.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely one of the functions I would expect to benefit the most from this change, given that NULLIF most of the time the right hand argument is likely to be a scalar.

@seddonm1
Copy link
Contributor

I wonder if it is possible to get the best of both worlds, by extending the Array trait slightly and having a subclass of Array which denotes scalars, like ScalarArray which do not need Buffer storage and just represents constant scalars. This way, functions would only need to deal with Array, but can recognize this subclass ScalarArray and do optimizations that way. This is a very half-formed thought at the moment. The train of thought is just what if the Array was not strictly a buffer-based representation but just a way to access columnar data, and in certain cases represents scalars.

I also have many instances where knowing the ScalarArray vs Array would provide huge performance opportunities in the big PR implement Postgres functions : #9243 - especially for things like Regex matching. It would be relatively trivial to implement if it could be pattern matched.

@jorgecarleitao
Copy link
Member Author

@velvia , @seddonm1 My opinion atm is that would not really help much from the compute side, as we would still need to write separate logic for the 4 cases (Array, scalar), (scalar, Array), (scalar, scalar), (array, array), which is exactly what this PR already proposes.

For example, a subtraction of two arrays goes along the lines of

let iter = lhs.values().zip(rhs.values()).map(|(r, l)| r - l);
let array = unsafe { PrimitiveArray<T>::from_trusted_len_iter(iter) };
// .values() is a slice

If we had an implementation of PrimitiveScalarArray<T>, we could change .values() to be an iterator, but then we lose the benefits of contiguous regions, as Rust can no longer assume that values is a contiguous memory region (slices have that property, iterators do not). If both sides are scalars, we want to to create a PrimitiveScalarArray<T>, so, again we need to branch. In all cases, we need a match (lhs, rhs). I think that DataFusion in this respect is doing things right.

To change the Array trait we basically need a change in the arrow specification so that all implementations agree on how to communicate such information via IPC, ffi, etc, so, that is mailing list material :)

Note that this PR addresses @alamb 's concern by introducing a adapter that people can use if they do not want to bother implementing the scalar variants.

@alamb
Copy link
Contributor

alamb commented Feb 14, 2021

Note that this PR addresses @alamb 's concern by introducing a adapter that people can use if they do not want to bother implementing the scalar variants.

Yes, the adapter function I think lowers the barrier to implementing initial scalar functions and should be an easier upgrade path.

TLDR it is that one can call make_scalar_function(your_old_func_implementation) and not worry about the updates.

I think this PR is ready, is a great step forward. Let's merge it and continue to iterate.

@alamb alamb closed this in 8547c61 Feb 14, 2021
@alamb alamb removed the needs-rebase A PR that needs to be rebased by the author label Feb 15, 2021
sgnkc pushed a commit to sgnkc/arrow that referenced this pull request Feb 17, 2021
…nctions.

This PR adds support for `ScalarValue` to all builtin-functions and UDFs from DataFusion.

This allows to execute builtin functions without having to create arrays and pass then to the kernels.

With this change, coding a new kernel and UDFs is a more time consuming, as it is necessary to cater for both the scalar and array case. OTOH, this leads to a major performance improvement as we only need to perform 1 operation instead of the usual number of rows operations.

Closes apache#9376 from jorgecarleitao/scalar_fnc

Authored-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants