From c5c8a48950eb9080a076d432e97dbdc2152c3ab3 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 6 Aug 2024 10:04:09 -0400 Subject: [PATCH 01/11] Blog post on using UDFs in python --- ...08-06-datafusion-python-udf-comparisons.md | 595 ++++++++++++++++++ 1 file changed, 595 insertions(+) create mode 100644 _posts/2024-08-06-datafusion-python-udf-comparisons.md diff --git a/_posts/2024-08-06-datafusion-python-udf-comparisons.md b/_posts/2024-08-06-datafusion-python-udf-comparisons.md new file mode 100644 index 00000000..c7bbdba5 --- /dev/null +++ b/_posts/2024-08-06-datafusion-python-udf-comparisons.md @@ -0,0 +1,595 @@ +--- +layout: post +title: "Comparing approaches to User Defined Functions in Apache Datafusion using Python" +date: "2024-08-06 00:00:00" +author: timsaucer +categories: [tutorial] +--- + + +# Writing User Defined Functions in Apache Datafusion using Python + +## Personal Context + +For a few months now I’ve been working with Apache DataFusion, a fast query engine written in rust. +From my experience the language that nearly all data scientists are working in is Python. In +general, for in memory work people often stick to pandas and pyspark for larger tasks that cannot +fit into memory. Polars is also growing extremely fast. + +Personally, I would love a single query approach that is fast for both in memory usage and can +extend to large batch processing to exploit parallelization. I think DataFusion, coupled with +Ballista, may provide this solution. + +As I’m testing, I’m primarily limiting my work to the datafusion-python project, a wrapper around +the rust library. This wrapper gives you the speed advantages of keeping all of the data in the +rust implementation and the ergonomics of working in python. Personally, I would prefer to work +purely in rust, but I also recognize that since the industry works in python we should meet the +people where they are. + +## User Defined Functions + +The focus of this post is User Defined Functions. The DataFusion library gives a lot of useful +functions already for doing dataframe manipulation. These are going to be similar to those you +find in other dataframe libraries. You’ll be able to do simple arithmetic, create substrings of +columns, or find the average value across a group of rows. These cover most of the use cases +you’ll need in a DataFrame. + +However, there will always arise times when you want a custom function. By using user defined +functions (UDFs) you open the world of possibilities of your code. Sometimes there simply isn’t an +easy way to use built in functions to achieve your goals. + +In the following, I’m going to demonstrate two example use cases. These are based on real world +problems I’ve encountered. Also I want to demonstrate the approach of “make it work, make it work +well, make it fast” that is a motto I’ve seen thrown around in data science. + +I will demonstrate three approaches to writing UDFs. In order of increasing performance they are + +- Writing a pure python function to do your computation +- Using the pyarrow libraries in python to accelerate your processing +- Writing a UDF in rust and exposing it to python + +Additionally I will demonstrate two variants of this. The first will be nearly identical to the +pyarrow library approach to simplicity of understanding how to connect the rust code to python. The +second version we will do the iteration through the input arrays ourselves to give even greater +flexibility to the user. + +Here are the two example use cases, taken from my own work but generalized. + +### Use Case 1: Scalar Function + +I have a DataFrame and a list of tuples that I’m interested in. I want to filter out the dataframe +to only have values that match those tuples from certain columns in the dataframe. For example, +suppose I have a table of sales line items. There are many columns, but I am interested in three: a +part key, supplier key, and return status. I want only to return a dataframe with a specific +combination of these three values. + +Probably the most ergonomic way to do this without UDF is to turn that list of tuples into a +dataframe itself, perform a join, and select the columns from the original dataframe. If we were +working in pyspark we would probably broadcast join the dataframe created from the tuple list since +it is tiny. In practice, I have found that with some dataframe libraries performing a filter rather +than a join can be significantly faster. This is worth profiling for your specific use case. + +### Use Case 2: Aggregate or Window Function + +I have a dataframe with many values that I want to aggregate. I have already analyzed it and +determined there is a noise level below which I do not want to include in my analysis. I want to +compute a sum of only values that are above my noise threshold. + +This can be done fairly easy without leaning on a User Defined Aggegate Function (UDAF). You can +simply filter the dataframe and then aggregate using the built in `sum` function. Here, we +demonstrate doing this as a UDF primarily as an example of how to write UDAFs. We will use the +pyarrow compute approach. + +## Pure Python approach + +The fastest way (developer time, not code time) for me to implement the scalar problem solution +was to do something along the lines of “for each row, check the values of interest contains that +tuple”. I’ve published this as [an example](https://github.com/apache/datafusion-python/blob/main/examples/python-udf-comparisons.py) +in the [datafusion-python repository](https://github.com/apache/datafusion-python). Here is an +example of how this can be done: + +```python +values_of_interest = [ + (1530, 4031, "N"), + (6530, 1531, "N"), + (5618, 619, "N"), + (8118, 8119, "N"), +] + +def is_of_interest_impl( + partkey_arr: pa.Array, + suppkey_arr: pa.Array, + returnflag_arr: pa.Array, +) -> pa.Array: + result = [] + for idx, partkey in enumerate(partkey_arr): + partkey = partkey.as_py() + suppkey = suppkey_arr[idx].as_py() + returnflag = returnflag_arr[idx].as_py() + value = (partkey, suppkey, returnflag) + result.append(value in values_of_interest) + + return pa.array(result) + + +is_of_interest = udf( + is_of_interest_impl, + [pa.int64(), pa.int64(), pa.utf8()], + pa.bool_(), + "stable", +) + +df_udf_filter = df_lineitem.filter( + is_of_interest(col("l_partkey"), col("l_suppkey"), col("l_returnflag")) +) +``` + +When working with a DataFusion UDF in python, you define your function to take in some number of +expressions. During the evaluation, these will get computed into their corresponding values and +passed to your UDF as a pyarrow Array. We must return an Array also with the same number of +elements (rows). So the UDF example just iterates through all of the arrays and checks to see if +the tuple created from these columns matches any of those that we’re looking for. + +I’ll repeat because this is something that tripped me up the first time I wrote a UDF for +datafusion: **Datafusion UDFs, even scalar UDFs, process an array of values at a time not a single +row.** This is different from some other dataframe libraries and you may need to recognize a slight +change in mentality. + +Some important lines here are the lines like `partkey = partkey.as_py()`. When we do this, we pay a +heavy cost. Now instead of keeping the analysis in the rust code, we have to take the values in the +array and convert them over to python objects. In this case we end up getting two numbers and a +string as real python objects, complete with reference counting and all. Also we are iterating +through the array in python rather than rust native. These will **significantly** slow down your +code. Any time you have to cross the barrier where you change values inside the rust arrays into +python objects or vice versa you will pay **heavy** cost in that transformation. You will want to +design your UDFs to avoid this as much as possible. + +## Python approach using pyarrow compute + +DataFusion uses [Apache Arrow](https://arrow.apache.org/) as it’s in memory data format. This can +be seen in the way that Arrow Arrays are passed into the UDFs. We can take advantage of the fact +that [pyarrow](https://github.com/apache/arrow-rs), the python arrow wrapper, provides a variety of +useful functions. In the example below, we are only using a few of the boolean functions and the +equality function. Each of these functions takes two arrays and analyzes them row by row. In the +below example, we shift the logic around a little since we are now operating on an entire array of +values instead of checking a single row ourselves. + +```python +import pyarrow.compute as pc + +def udf_using_pyarrow_compute_impl( + partkey_arr: pa.Array, + suppkey_arr: pa.Array, + returnflag_arr: pa.Array, +) -> pa.Array: + results = None + for partkey, suppkey, returnflag in values_of_interest: + filtered_partkey_arr = pc.equal(partkey_arr, partkey) + filtered_suppkey_arr = pc.equal(suppkey_arr, suppkey) + filtered_returnflag_arr = pc.equal(returnflag_arr, returnflag) + + resultant_arr = pc.and_(filtered_partkey_arr, filtered_suppkey_arr) + resultant_arr = pc.and_(resultant_arr, filtered_returnflag_arr) + + if results is None: + results = resultant_arr + else: + results = pc.or_(results, resultant_arr) + + return results + + +udf_using_pyarrow_compute = udf( + udf_using_pyarrow_compute_impl, + [pa.int64(), pa.int64(), pa.utf8()], + pa.bool_(), + "stable", +) + +df_udf_pyarrow_compute = df_lineitem.filter( + udf_using_pyarrow_compute(col("l_partkey"), col("l_suppkey"), col("l_returnflag")) +) +``` + +The idea in the code above is that we will iterate through each of the values of interest, which we +expect to be small. For each of the columns, we compare the value of interest to it’s corresponding +array using `pyarrow.compute.equal`. This will give use three boolean arrays. We have a match to +the tuple if we have a row in all three arrays that is true, so we use `pyarrow.compute.and_`. Now +our return value from the UDF needs to include arrays for which any of the values of interest list +of tuples exists, so we take the result from the current loop and perform a `pyarrow.compute.or_` +on it. + +From my benchmarking, switching from approach of converting values into python objects to this +approach of using the pyarrow built in functions leads to about a 10x speed improvement in this +simple problem. + +It’s worth noting that almost all of the pyarrow compute functions expect to take one or two arrays +as their arguments. If you need to write a UDF that is evaluating three or more columns, you’ll +need to do something akin to what we’ve shown here. + +## Rust UDF with Python wrapper + +This is the most complicated approach, but has the potential to be the most performant. What we +will do here is write a Rust function to perform our computation and then expose that function to +python. I know of two use cases where I would recommend this approach. The first is the case when +the pyarrow compute functions are insufficient for your needs. Perhaps your code is too complex or +could be greatly simplified if you pulled in some outside dependency. The second use case is when +you have written a UDF that you’re sharing across multiple projects and have hardened the approach. +It is possible that you can implement your function in rust to give a speed improvement and then +every project that is using this shared UDF will benefit from those updates. + +When deciding to use this approach, it’s worth considering how much you think you’ll actually +benefit from the rust implementation to decide if it’s worth the additional effort to maintain and +deploy the python wheels you generate. It is certainly not necessary for every use case. + +Due to the excellent work by the python arrow team, we can simplify our work to needing only two +dependencies on the rust side, [arrow-rs](https://github.com/apache/arrow-rs) and +[pyo3](https://pyo3.rs/). I have posted a [minimal example](https://github.com/timsaucer/tuple_filter_example). +You’ll need [maturin](https://github.com/PyO3/maturin) to build the project, and I recommend using +release mode when building. + +```bash +maturin develop --release +``` + +When you write your UDF in rust you generally will need to take these steps + +1. Write a function description that takes in some number of python generic objects. +2. Convert these objects to Arrow Arrays of the appropriate type(s). +3. Perform your computation and create a resultant Array. +3. Convert the array into a python generic object. + +For the conversion to and from python objects, we can take advantage of the +`ArrayData::from_pyarrow_bound` and `ArrayData::to_pyarrow` functions. All that remains is to +perform your computation. + +We are going to demonstrate doing this computation in two ways. The first is to mimic what we’ve +done in the above approach using pyarrow. In the second we demonstrate iterating through the three +arrays ourselves. + +In our first approach, we can expect the performance to be nearly identical to when we used the +pyarrow compute functions. On the rust side we will have slightly less overhead but the heavy +lifting portions of the code are essentially the same between this rust implementation and the +pyarrow approach above. + +The reason for demonstrating this, even though it doesn’t provide a significant speedup over +python, is to primarily demonstrate how to make the python to rust with python wrapper +transition. In the second implementation you can see how we can iterate through all of the arrays +ourselves. + +```rust +#[pyfunction] +pub fn tuple_filter_fn( + py: Python<'_>, + partkey_expr: &Bound<'_, PyAny>, + suppkey_expr: &Bound<'_, PyAny>, + returnflag_expr: &Bound<'_, PyAny>, +) -> PyResult> { + let partkey_arr: PrimitiveArray = + ArrayData::from_pyarrow_bound(partkey_expr)?.into(); + let suppkey_arr: PrimitiveArray = + ArrayData::from_pyarrow_bound(suppkey_expr)?.into(); + let returnflag_arr: StringArray = ArrayData::from_pyarrow_bound(returnflag_expr)?.into(); + + let values_of_interest = vec![ + (1530, 4031, "N".to_string()), + (6530, 1531, "N".to_string()), + (5618, 619, "N".to_string()), + (8118, 8119, "N".to_string()), + ]; + + let mut res: Option = None; + + for (partkey, suppkey, returnflag) in &values_of_interest { + let filtered_partkey_arr = BooleanArray::from_unary(&partkey_arr, |p| p == *partkey); + let filtered_suppkey_arr = BooleanArray::from_unary(&suppkey_arr, |s| s == *suppkey); + let filtered_returnflag_arr = + BooleanArray::from_unary(&returnflag_arr, |s| s == returnflag); + + let part_and_supp = compute::and(&filtered_partkey_arr, &filtered_suppkey_arr) + .map_err(|e| PyValueError::new_err(e.to_string()))?; + let resultant_arr = compute::and(&part_and_supp, &filtered_returnflag_arr) + .map_err(|e| PyValueError::new_err(e.to_string()))?; + + res = match res { + Some(r) => compute::or(&r, &resultant_arr).ok(), + None => Some(resultant_arr), + }; + } + + res.unwrap().into_data().to_pyarrow(py) +} + + +#[pymodule] +fn tuple_filter_example(module: &Bound<'_, PyModule>) -> PyResult<()> { + module.add_function(wrap_pyfunction!(tuple_filter_fn, module)?)?; + Ok(()) +} +``` + +To use this we use the `udf` function in datafusion-python just as before. + +```python +from datafusion import udf +import pyarrow as pa +from tuple_filter_example import tuple_filter_fn + +udf_using_custom_rust_fn = udf( + tuple_filter_fn, + [pa.int64(), pa.int64(), pa.utf8()], + pa.bool_(), + "stable", +) +``` + +That's it! We've now got a third party rust UDF with python wrappers working with DataFusion's +python bindings! + +### Rust UDF with initialization + +Looking at the code above, you can see that it is hard coding the values we're interested in. There +are many types of UDFs that don't require any additional data provided to them before they start +the computation. The code above is sloppy, so let's clean it up. + +We want to write the function to take some additional data. A limitation of the UDFs we create is +that they expect to operate on entire arrays of data at a time. We can get around this problem by +creating an initializer for our UDF. We do this by defining a rust struct that contains the data we +need and implement two methods on this struct, `new` and `__call__`. By doing this we will create a +python object that is callable, so it can be the function we provide to `udf`. + +```rust +#[pyclass] +pub struct TupleFilterClass { + values_of_interest: Vec<(i64, i64, String)>, +} + +#[pymethods] +impl TupleFilterClass { + #[new] + fn new(values_of_interest: Vec<(i64, i64, String)>) -> Self { + Self { + values_of_interest, + } + } + + fn __call__( + &self, + py: Python<'_>, + partkey_expr: &Bound<'_, PyAny>, + suppkey_expr: &Bound<'_, PyAny>, + returnflag_expr: &Bound<'_, PyAny>, + ) -> PyResult> { + let partkey_arr: PrimitiveArray = + ArrayData::from_pyarrow_bound(partkey_expr)?.into(); + let suppkey_arr: PrimitiveArray = + ArrayData::from_pyarrow_bound(suppkey_expr)?.into(); + let returnflag_arr: StringArray = ArrayData::from_pyarrow_bound(returnflag_expr)?.into(); + + let mut res: Option = None; + + for (partkey, suppkey, returnflag) in &self.values_of_interest { + let filtered_partkey_arr = BooleanArray::from_unary(&partkey_arr, |p| p == *partkey); + let filtered_suppkey_arr = BooleanArray::from_unary(&suppkey_arr, |s| s == *suppkey); + let filtered_returnflag_arr = + BooleanArray::from_unary(&returnflag_arr, |s| s == returnflag); + + let part_and_supp = compute::and(&filtered_partkey_arr, &filtered_suppkey_arr) + .map_err(|e| PyValueError::new_err(e.to_string()))?; + let resultant_arr = compute::and(&part_and_supp, &filtered_returnflag_arr) + .map_err(|e| PyValueError::new_err(e.to_string()))?; + + res = match res { + Some(r) => compute::or(&r, &resultant_arr).ok(), + None => Some(resultant_arr), + }; + } + + res.unwrap().into_data().to_pyarrow(py) + } +} + +#[pymodule] +fn tuple_filter_example(module: &Bound<'_, PyModule>) -> PyResult<()> { + module.add_class::()?; + Ok(()) +} +``` + +When you write this, you don't have to call your constructor `new`. The more important part is that +you have `#[new]` designated on the function. With this you can provide any kinds of data you need +during processing. Using this initializer in python is fairly straightforward. + +```python +from datafusion import udf +import pyarrow as pa +from tuple_filter_example import TupleFilterClass + +tuple_filter_class = TupleFilterClass(values_of_interest) + +udf_using_custom_rust_fn_with_data = udf( + tuple_filter_class, + [pa.int64(), pa.int64(), pa.utf8()], + pa.bool_(), + "stable", + name="tuple_filter_with_data" +) +``` + +When you use this approach you will need to provide a `name` argument to `udf`. This is because our +class/struct does not get the `__qualname__` attribute that the `udf` function is looking for. You +can give this udf any name you choose. + +### Rust UDF with direct iteration + +The final version of our scalar UDF is one where we implement it in rust and iterate through all of +the arrays ourselves. If you are iterating through more than 3 arrays at a time I recommend looking +at [izip](https://docs.rs/itertools/latest/itertools/macro.izip.html) in the +[itertools crate](https://crates.io/crates/itertools). For ease of understanding and since we only +have 3 arrays here I will just explicitly create my own tuple here. + +```rust +#[pyclass] +pub struct TupleFilterDirectIterationClass { + values_of_interest: Vec<(i64, i64, String)>, +} + +#[pymethods] +impl TupleFilterDirectIterationClass { + #[new] + fn new(values_of_interest: Vec<(i64, i64, String)>) -> Self { + Self { values_of_interest } + } + + fn __call__( + &self, + py: Python<'_>, + partkey_expr: &Bound<'_, PyAny>, + suppkey_expr: &Bound<'_, PyAny>, + returnflag_expr: &Bound<'_, PyAny>, + ) -> PyResult> { + let partkey_arr: PrimitiveArray = + ArrayData::from_pyarrow_bound(partkey_expr)?.into(); + let suppkey_arr: PrimitiveArray = + ArrayData::from_pyarrow_bound(suppkey_expr)?.into(); + let returnflag_arr: StringArray = ArrayData::from_pyarrow_bound(returnflag_expr)?.into(); + + let values_to_search: Vec<(&i64, &i64, &str)> = (&self.values_of_interest) + .iter() + .map(|(a, b, c)| (a, b, c.as_str())) + .collect(); + + let values = partkey_arr + .values() + .iter() + .zip(suppkey_arr.values().iter()) + .zip(returnflag_arr.iter()) + .map(|((a, b), c)| (a, b, c.unwrap_or_default())) + .map(|v| values_to_search.contains(&v)); + + let res: BooleanArray = BooleanBuffer::from_iter(values).into(); + + res.into_data().to_pyarrow(py) + } +} +``` + +We convert the `values_of_interest` into a vector of borrowed types so that we can do a fast search +without creating additional memory. The other option is to turn the `returnflag` into a `String` +but that memory allocation is unnecessary. After taht we use two `zip` operations so that we can +iterate over all three columns in a single pass. Since each `zip` will return a tuple of two +elements, a quick `map` turns them into the tuple format we need. Also, `StringArray` is a little +different in the buffer it uses, so it is treated slightly differently from the others. + +## User Defined Aggregate Function + +Writing a user defined aggregate function or user defined window function is slightly more complex +than scalar functions. This is because we must accumulate values and there is no guarantee that one +batch will contain off the values we are aggregating over. For this we need to define an +`Accumulator` which will do a few things. + +- Process a batch and compute an internal state +- Share the state so that we can combine multiple batches +- Merge the results across multiple batches +- Return the final result + +In the example below, we're going to look at customer orders and we want to know per customer ID, +how much they have ordered total. We want to ignore small orders, which we define as anything over +5000. + +```python +from datafusion import Accumulator, udaf +import pyarrow as pa +import pyarrow.compute as pc + +IGNORE_THESHOLD = 5000.0 +class AboveThersholdAccum(Accumulator): + def __init__(self) -> None: + self._sum = 0.0 + + def update(self, values: pa.Array) -> None: + over_threshold = pc.greater(values, pa.scalar(IGNORE_THESHOLD)) + sum_above = pc.sum(values.filter(over_threshold)).as_py() + if sum_above is None: + sum_above = 0.0 + self._sum = self._sum + sum_above + + def merge(self, states: List[pa.Array]) -> None: + self._sum = self._sum + pc.sum(states[0]).as_py() + + def state(self) -> List[pa.Scalar]: + return [pa.scalar(self._sum)] + + def evaluate(self) -> pa.Scalar: + return pa.scalar(self._sum) + +sum_above_threshold = udaf(AboveThersholdAccum, [pa.float64()], pa.float64(), [pa.float64()], 'stable') + +df_orders.aggregate([col("o_custkey")],[sum_above_threshold(col("o_totalprice")).alias("sales")]).show() +``` + +Since we are doing a `sum` we can keep a single value as our internal state. When we call `update()` +we will process a single array and update the internal state, which we share with the `state()` +function. For larger batches we may `merge()` these states. It is important to note that the +`states` in the `merge()` function are an array of the values returned from `state()`. It is +entirely possible that the `merge` function is significantly different than the `update`, though in +our example they are very similar. + +## Performance Comparison + +For the scalar functions above, we performed a timing evaluation, repeating the operation 100 +times. For this simple example these are our results. + +``` ++-----------------------------+--------------+---------+ +| approach | Average Time | Std Dev | ++-----------------------------+--------------+---------+ +| python udf | 4.969 | 0.062 | +| simple filter | 1.075 | 0.022 | +| explicit filter | 0.685 | 0.063 | +| pyarrow compute | 0.529 | 0.017 | +| arrow rust compute | 0.511 | 0.034 | +| arrow rust compute as class | 0.502 | 0.011 | +| rust custom iterator | 0.478 | 0.009 | ++-----------------------------+--------------+---------+ +``` + +As expected, the conversion to python objects is by far the worst performance. As soon as we drop +into using any functions that keep the data entirely on the rust side we see a near 10x speed +improvement. Then as we increase our complexity from using pyarrow compute functions to +implementing the UDF in rust we see incremental improvements. Our fastest approach - iterating +through the arrays ourselves does operate nearly 10% faster than the pyarrow compute approach. + +## Final Thoughts and Recommendations + +For anyone who is curious about [DataFusion](https://datafusion.apache.org/) I highly recommend +giving it a try. This post was designed to make it easier for new users to the python implementation +to work with User Defined Functions by giving a few examples of how one might implement these. + +When it comes to designing UDFs, I strongly recommend seeing if you can write your UDF using +[pyarrow functions](https://arrow.apache.org/docs/python/api/compute.html) rather than pure python +objects. These will give you enormous speed benefits. If you must do something that isn't well +represented by the pyarrow compute functions, then I would consider using a Rust based UDF in the +manner shown above. + +Lastly, the Apache Arrow and DataFusion community is an active group of very helpful people working +to make a great tool. If you want to get involved, please take a look at the +[online documenation](https://datafusion.apache.org/python/) and jump in to help with one of the +(open issues)[https://github.com/apache/datafusion-python/issues]. From 93c56027e8cf3bf8ab38df285d5fa4abc8325708 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 12 Aug 2024 21:10:26 -0400 Subject: [PATCH 02/11] Addressing review comments --- ...08-06-datafusion-python-udf-comparisons.md | 71 +++++++++++-------- 1 file changed, 41 insertions(+), 30 deletions(-) diff --git a/_posts/2024-08-06-datafusion-python-udf-comparisons.md b/_posts/2024-08-06-datafusion-python-udf-comparisons.md index c7bbdba5..c298bddf 100644 --- a/_posts/2024-08-06-datafusion-python-udf-comparisons.md +++ b/_posts/2024-08-06-datafusion-python-udf-comparisons.md @@ -1,6 +1,6 @@ --- layout: post -title: "Comparing approaches to User Defined Functions in Apache Datafusion using Python" +title: "Comparing approaches to User Defined Functions in Apache DataFusion using Python" date: "2024-08-06 00:00:00" author: timsaucer categories: [tutorial] @@ -24,36 +24,44 @@ See the License for the specific language governing permissions and limitations under the License. {% endcomment %} --> -# Writing User Defined Functions in Apache Datafusion using Python +# Writing User Defined Functions in Apache DataFusion using Python ## Personal Context -For a few months now I’ve been working with Apache DataFusion, a fast query engine written in rust. -From my experience the language that nearly all data scientists are working in is Python. In -general, for in memory work people often stick to pandas and pyspark for larger tasks that cannot -fit into memory. Polars is also growing extremely fast. +For a few months now I’ve been working with [Apache DataFusion](https://datafusion.apache.org/), a +fast query engine written in rust. From my experience the language that nearly all data scientists +are working in is Python. In general, often stick to [pandas](https://pandas.pydata.org/) for +in-memory tasks and [pyspark](https://spark.apache.org/) for larger tasks that require distributed +processing. + +In addition to DataFusion, there is another rust based newcomer to the DataFrame world, +[Polars](https://pola.rs/). It is growing extremely fast, and it serves many of the same use cases +as DataFusion. For my use cases, I'm interested in DataFusion because I want to be able to build +small scale tests rapidly and then scale them up to larger distributed systems with ease. I do +recommend evaluating Polars for in memory work. Personally, I would love a single query approach that is fast for both in memory usage and can extend to large batch processing to exploit parallelization. I think DataFusion, coupled with Ballista, may provide this solution. -As I’m testing, I’m primarily limiting my work to the datafusion-python project, a wrapper around -the rust library. This wrapper gives you the speed advantages of keeping all of the data in the +As I’m testing, I’m primarily limiting my work to the +[datafusion-python](https://datafusion.apache.org/python/) project, a wrapper around the rust +DataFusion library. This wrapper gives you the speed advantages of keeping all of the data in the rust implementation and the ergonomics of working in python. Personally, I would prefer to work purely in rust, but I also recognize that since the industry works in python we should meet the people where they are. -## User Defined Functions +## User-Defined Functions -The focus of this post is User Defined Functions. The DataFusion library gives a lot of useful -functions already for doing dataframe manipulation. These are going to be similar to those you -find in other dataframe libraries. You’ll be able to do simple arithmetic, create substrings of +The focus of this post is User-Defined Functions (UDFs). The DataFusion library gives a lot of +useful functions already for doing DataFrame manipulation. These are going to be similar to those +you find in other DataFrame libraries. You’ll be able to do simple arithmetic, create substrings of columns, or find the average value across a group of rows. These cover most of the use cases you’ll need in a DataFrame. -However, there will always arise times when you want a custom function. By using user defined -functions (UDFs) you open the world of possibilities of your code. Sometimes there simply isn’t an -easy way to use built in functions to achieve your goals. +However, there will always arise times when you want a custom function. With UDFs you open a +world of possibilities of your code. Sometimes there simply isn’t an easy way to use built-in +functions to achieve your goals. In the following, I’m going to demonstrate two example use cases. These are based on real world problems I’ve encountered. Also I want to demonstrate the approach of “make it work, make it work @@ -74,26 +82,26 @@ Here are the two example use cases, taken from my own work but generalized. ### Use Case 1: Scalar Function -I have a DataFrame and a list of tuples that I’m interested in. I want to filter out the dataframe -to only have values that match those tuples from certain columns in the dataframe. For example, +I have a DataFrame and a list of tuples that I’m interested in. I want to filter out the DataFrame +to only have values that match those tuples from certain columns in the DataFrame. For example, suppose I have a table of sales line items. There are many columns, but I am interested in three: a -part key, supplier key, and return status. I want only to return a dataframe with a specific +part key, supplier key, and return status. I want only to return a DataFrame with a specific combination of these three values. Probably the most ergonomic way to do this without UDF is to turn that list of tuples into a -dataframe itself, perform a join, and select the columns from the original dataframe. If we were -working in pyspark we would probably broadcast join the dataframe created from the tuple list since -it is tiny. In practice, I have found that with some dataframe libraries performing a filter rather +DataFrame itself, perform a join, and select the columns from the original DataFrame. If we were +working in pyspark we would probably broadcast join the DataFrame created from the tuple list since +it is tiny. In practice, I have found that with some DataFrame libraries performing a filter rather than a join can be significantly faster. This is worth profiling for your specific use case. ### Use Case 2: Aggregate or Window Function -I have a dataframe with many values that I want to aggregate. I have already analyzed it and +I have a DataFrame with many values that I want to aggregate. I have already analyzed it and determined there is a noise level below which I do not want to include in my analysis. I want to compute a sum of only values that are above my noise threshold. This can be done fairly easy without leaning on a User Defined Aggegate Function (UDAF). You can -simply filter the dataframe and then aggregate using the built in `sum` function. Here, we +simply filter the DataFrame and then aggregate using the built-in `sum` function. Here, we demonstrate doing this as a UDF primarily as an example of how to write UDAFs. We will use the pyarrow compute approach. @@ -101,7 +109,8 @@ pyarrow compute approach. The fastest way (developer time, not code time) for me to implement the scalar problem solution was to do something along the lines of “for each row, check the values of interest contains that -tuple”. I’ve published this as [an example](https://github.com/apache/datafusion-python/blob/main/examples/python-udf-comparisons.py) +tuple”. I’ve published this as +[an example](https://github.com/apache/datafusion-python/blob/main/examples/python-udf-comparisons.py) in the [datafusion-python repository](https://github.com/apache/datafusion-python). Here is an example of how this can be done: @@ -128,7 +137,8 @@ def is_of_interest_impl( return pa.array(result) - +# Wrap our custom function with `datafusion.udf`, annotating expected +# parameter and return types is_of_interest = udf( is_of_interest_impl, [pa.int64(), pa.int64(), pa.utf8()], @@ -148,8 +158,8 @@ elements (rows). So the UDF example just iterates through all of the arrays and the tuple created from these columns matches any of those that we’re looking for. I’ll repeat because this is something that tripped me up the first time I wrote a UDF for -datafusion: **Datafusion UDFs, even scalar UDFs, process an array of values at a time not a single -row.** This is different from some other dataframe libraries and you may need to recognize a slight +datafusion: **DataFusion UDFs, even scalar UDFs, process an array of values at a time not a single +row.** This is different from some other DataFrame libraries and you may need to recognize a slight change in mentality. Some important lines here are the lines like `partkey = partkey.as_py()`. When we do this, we pay a @@ -163,9 +173,10 @@ design your UDFs to avoid this as much as possible. ## Python approach using pyarrow compute -DataFusion uses [Apache Arrow](https://arrow.apache.org/) as it’s in memory data format. This can +DataFusion uses [Apache Arrow](https://arrow.apache.org/) as its in-memory data format. This can be seen in the way that Arrow Arrays are passed into the UDFs. We can take advantage of the fact -that [pyarrow](https://github.com/apache/arrow-rs), the python arrow wrapper, provides a variety of +that [pyarrow](https://arrow.apache.org/docs/python/), the canonical Python Arrow implementation, +provides a variety of useful functions. In the example below, we are only using a few of the boolean functions and the equality function. Each of these functions takes two arrays and analyzes them row by row. In the below example, we shift the logic around a little since we are now operating on an entire array of @@ -217,7 +228,7 @@ of tuples exists, so we take the result from the current loop and perform a `pya on it. From my benchmarking, switching from approach of converting values into python objects to this -approach of using the pyarrow built in functions leads to about a 10x speed improvement in this +approach of using the pyarrow built-in functions leads to about a 10x speed improvement in this simple problem. It’s worth noting that almost all of the pyarrow compute functions expect to take one or two arrays From cb3b1f16ad2fcde9fa6ee6f6028d1e9fc45afa1a Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 12 Aug 2024 21:11:23 -0400 Subject: [PATCH 03/11] Small typo --- _posts/2024-08-06-datafusion-python-udf-comparisons.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_posts/2024-08-06-datafusion-python-udf-comparisons.md b/_posts/2024-08-06-datafusion-python-udf-comparisons.md index c298bddf..2ad7cdae 100644 --- a/_posts/2024-08-06-datafusion-python-udf-comparisons.md +++ b/_posts/2024-08-06-datafusion-python-udf-comparisons.md @@ -60,7 +60,7 @@ columns, or find the average value across a group of rows. These cover most of t you’ll need in a DataFrame. However, there will always arise times when you want a custom function. With UDFs you open a -world of possibilities of your code. Sometimes there simply isn’t an easy way to use built-in +world of possibilities in your code. Sometimes there simply isn’t an easy way to use built-in functions to achieve your goals. In the following, I’m going to demonstrate two example use cases. These are based on real world From 104f846c083db0734afad89351eff37ee83aca9f Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 12 Aug 2024 21:20:20 -0400 Subject: [PATCH 04/11] Capitalization --- ...08-06-datafusion-python-udf-comparisons.md | 84 +++++++++---------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/_posts/2024-08-06-datafusion-python-udf-comparisons.md b/_posts/2024-08-06-datafusion-python-udf-comparisons.md index 2ad7cdae..16faa6f0 100644 --- a/_posts/2024-08-06-datafusion-python-udf-comparisons.md +++ b/_posts/2024-08-06-datafusion-python-udf-comparisons.md @@ -29,12 +29,12 @@ limitations under the License. ## Personal Context For a few months now I’ve been working with [Apache DataFusion](https://datafusion.apache.org/), a -fast query engine written in rust. From my experience the language that nearly all data scientists +fast query engine written in Rust. From my experience the language that nearly all data scientists are working in is Python. In general, often stick to [pandas](https://pandas.pydata.org/) for in-memory tasks and [pyspark](https://spark.apache.org/) for larger tasks that require distributed processing. -In addition to DataFusion, there is another rust based newcomer to the DataFrame world, +In addition to DataFusion, there is another Rust based newcomer to the DataFrame world, [Polars](https://pola.rs/). It is growing extremely fast, and it serves many of the same use cases as DataFusion. For my use cases, I'm interested in DataFusion because I want to be able to build small scale tests rapidly and then scale them up to larger distributed systems with ease. I do @@ -45,10 +45,10 @@ extend to large batch processing to exploit parallelization. I think DataFusion, Ballista, may provide this solution. As I’m testing, I’m primarily limiting my work to the -[datafusion-python](https://datafusion.apache.org/python/) project, a wrapper around the rust +[datafusion-python](https://datafusion.apache.org/python/) project, a wrapper around the Rust DataFusion library. This wrapper gives you the speed advantages of keeping all of the data in the -rust implementation and the ergonomics of working in python. Personally, I would prefer to work -purely in rust, but I also recognize that since the industry works in python we should meet the +Rust implementation and the ergonomics of working in Python. Personally, I would prefer to work +purely in Rust, but I also recognize that since the industry works in Python we should meet the people where they are. ## User-Defined Functions @@ -69,12 +69,12 @@ well, make it fast” that is a motto I’ve seen thrown around in data science. I will demonstrate three approaches to writing UDFs. In order of increasing performance they are -- Writing a pure python function to do your computation -- Using the pyarrow libraries in python to accelerate your processing -- Writing a UDF in rust and exposing it to python +- Writing a pure Python function to do your computation +- Using the pyarrow libraries in Python to accelerate your processing +- Writing a UDF in Rust and exposing it to Python Additionally I will demonstrate two variants of this. The first will be nearly identical to the -pyarrow library approach to simplicity of understanding how to connect the rust code to python. The +pyarrow library approach to simplicity of understanding how to connect the Rust code to Python. The second version we will do the iteration through the input arrays ourselves to give even greater flexibility to the user. @@ -151,7 +151,7 @@ df_udf_filter = df_lineitem.filter( ) ``` -When working with a DataFusion UDF in python, you define your function to take in some number of +When working with a DataFusion UDF in Python, you define your function to take in some number of expressions. During the evaluation, these will get computed into their corresponding values and passed to your UDF as a pyarrow Array. We must return an Array also with the same number of elements (rows). So the UDF example just iterates through all of the arrays and checks to see if @@ -163,12 +163,12 @@ row.** This is different from some other DataFrame libraries and you may need to change in mentality. Some important lines here are the lines like `partkey = partkey.as_py()`. When we do this, we pay a -heavy cost. Now instead of keeping the analysis in the rust code, we have to take the values in the -array and convert them over to python objects. In this case we end up getting two numbers and a -string as real python objects, complete with reference counting and all. Also we are iterating -through the array in python rather than rust native. These will **significantly** slow down your -code. Any time you have to cross the barrier where you change values inside the rust arrays into -python objects or vice versa you will pay **heavy** cost in that transformation. You will want to +heavy cost. Now instead of keeping the analysis in the Rust code, we have to take the values in the +array and convert them over to Python objects. In this case we end up getting two numbers and a +string as real Python objects, complete with reference counting and all. Also we are iterating +through the array in Python rather than Rust native. These will **significantly** slow down your +code. Any time you have to cross the barrier where you change values inside the Rust arrays into +Python objects or vice versa you will pay **heavy** cost in that transformation. You will want to design your UDFs to avoid this as much as possible. ## Python approach using pyarrow compute @@ -227,7 +227,7 @@ our return value from the UDF needs to include arrays for which any of the value of tuples exists, so we take the result from the current loop and perform a `pyarrow.compute.or_` on it. -From my benchmarking, switching from approach of converting values into python objects to this +From my benchmarking, switching from approach of converting values into Python objects to this approach of using the pyarrow built-in functions leads to about a 10x speed improvement in this simple problem. @@ -239,19 +239,19 @@ need to do something akin to what we’ve shown here. This is the most complicated approach, but has the potential to be the most performant. What we will do here is write a Rust function to perform our computation and then expose that function to -python. I know of two use cases where I would recommend this approach. The first is the case when +Python. I know of two use cases where I would recommend this approach. The first is the case when the pyarrow compute functions are insufficient for your needs. Perhaps your code is too complex or could be greatly simplified if you pulled in some outside dependency. The second use case is when you have written a UDF that you’re sharing across multiple projects and have hardened the approach. -It is possible that you can implement your function in rust to give a speed improvement and then +It is possible that you can implement your function in Rust to give a speed improvement and then every project that is using this shared UDF will benefit from those updates. When deciding to use this approach, it’s worth considering how much you think you’ll actually -benefit from the rust implementation to decide if it’s worth the additional effort to maintain and -deploy the python wheels you generate. It is certainly not necessary for every use case. +benefit from the Rust implementation to decide if it’s worth the additional effort to maintain and +deploy the Python wheels you generate. It is certainly not necessary for every use case. -Due to the excellent work by the python arrow team, we can simplify our work to needing only two -dependencies on the rust side, [arrow-rs](https://github.com/apache/arrow-rs) and +Due to the excellent work by the Python arrow team, we can simplify our work to needing only two +dependencies on the Rust side, [arrow-rs](https://github.com/apache/arrow-rs) and [pyo3](https://pyo3.rs/). I have posted a [minimal example](https://github.com/timsaucer/tuple_filter_example). You’ll need [maturin](https://github.com/PyO3/maturin) to build the project, and I recommend using release mode when building. @@ -260,14 +260,14 @@ release mode when building. maturin develop --release ``` -When you write your UDF in rust you generally will need to take these steps +When you write your UDF in Rust you generally will need to take these steps -1. Write a function description that takes in some number of python generic objects. +1. Write a function description that takes in some number of Python generic objects. 2. Convert these objects to Arrow Arrays of the appropriate type(s). 3. Perform your computation and create a resultant Array. -3. Convert the array into a python generic object. +3. Convert the array into a Python generic object. -For the conversion to and from python objects, we can take advantage of the +For the conversion to and from Python objects, we can take advantage of the `ArrayData::from_pyarrow_bound` and `ArrayData::to_pyarrow` functions. All that remains is to perform your computation. @@ -276,12 +276,12 @@ done in the above approach using pyarrow. In the second we demonstrate iterating arrays ourselves. In our first approach, we can expect the performance to be nearly identical to when we used the -pyarrow compute functions. On the rust side we will have slightly less overhead but the heavy -lifting portions of the code are essentially the same between this rust implementation and the +pyarrow compute functions. On the Rust side we will have slightly less overhead but the heavy +lifting portions of the code are essentially the same between this Rust implementation and the pyarrow approach above. The reason for demonstrating this, even though it doesn’t provide a significant speedup over -python, is to primarily demonstrate how to make the python to rust with python wrapper +Python, is to primarily demonstrate how to make the Python to Rust with Python wrapper transition. In the second implementation you can see how we can iterate through all of the arrays ourselves. @@ -336,7 +336,7 @@ fn tuple_filter_example(module: &Bound<'_, PyModule>) -> PyResult<()> { } ``` -To use this we use the `udf` function in datafusion-python just as before. +To use this we use the `udf` function in `datafusion-python` just as before. ```python from datafusion import udf @@ -351,8 +351,8 @@ udf_using_custom_rust_fn = udf( ) ``` -That's it! We've now got a third party rust UDF with python wrappers working with DataFusion's -python bindings! +That's it! We've now got a third party Rust UDF with Python wrappers working with DataFusion's +Python bindings! ### Rust UDF with initialization @@ -362,9 +362,9 @@ the computation. The code above is sloppy, so let's clean it up. We want to write the function to take some additional data. A limitation of the UDFs we create is that they expect to operate on entire arrays of data at a time. We can get around this problem by -creating an initializer for our UDF. We do this by defining a rust struct that contains the data we +creating an initializer for our UDF. We do this by defining a Rust struct that contains the data we need and implement two methods on this struct, `new` and `__call__`. By doing this we will create a -python object that is callable, so it can be the function we provide to `udf`. +Python object that is callable, so it can be the function we provide to `udf`. ```rust #[pyclass] @@ -426,7 +426,7 @@ fn tuple_filter_example(module: &Bound<'_, PyModule>) -> PyResult<()> { When you write this, you don't have to call your constructor `new`. The more important part is that you have `#[new]` designated on the function. With this you can provide any kinds of data you need -during processing. Using this initializer in python is fairly straightforward. +during processing. Using this initializer in Python is fairly straightforward. ```python from datafusion import udf @@ -450,7 +450,7 @@ can give this udf any name you choose. ### Rust UDF with direct iteration -The final version of our scalar UDF is one where we implement it in rust and iterate through all of +The final version of our scalar UDF is one where we implement it in Rust and iterate through all of the arrays ourselves. If you are iterating through more than 3 arrays at a time I recommend looking at [izip](https://docs.rs/itertools/latest/itertools/macro.izip.html) in the [itertools crate](https://crates.io/crates/itertools). For ease of understanding and since we only @@ -582,20 +582,20 @@ times. For this simple example these are our results. +-----------------------------+--------------+---------+ ``` -As expected, the conversion to python objects is by far the worst performance. As soon as we drop -into using any functions that keep the data entirely on the rust side we see a near 10x speed +As expected, the conversion to Python objects is by far the worst performance. As soon as we drop +into using any functions that keep the data entirely on the Rust side we see a near 10x speed improvement. Then as we increase our complexity from using pyarrow compute functions to -implementing the UDF in rust we see incremental improvements. Our fastest approach - iterating +implementing the UDF in Rust we see incremental improvements. Our fastest approach - iterating through the arrays ourselves does operate nearly 10% faster than the pyarrow compute approach. ## Final Thoughts and Recommendations For anyone who is curious about [DataFusion](https://datafusion.apache.org/) I highly recommend -giving it a try. This post was designed to make it easier for new users to the python implementation +giving it a try. This post was designed to make it easier for new users to the Python implementation to work with User Defined Functions by giving a few examples of how one might implement these. When it comes to designing UDFs, I strongly recommend seeing if you can write your UDF using -[pyarrow functions](https://arrow.apache.org/docs/python/api/compute.html) rather than pure python +[pyarrow functions](https://arrow.apache.org/docs/python/api/compute.html) rather than pure Python objects. These will give you enormous speed benefits. If you must do something that isn't well represented by the pyarrow compute functions, then I would consider using a Rust based UDF in the manner shown above. From b03a8776a1b8962f4cb6e6f3833265855d43b13e Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 12 Aug 2024 21:26:57 -0400 Subject: [PATCH 05/11] Small language adjustments --- _posts/2024-08-06-datafusion-python-udf-comparisons.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/_posts/2024-08-06-datafusion-python-udf-comparisons.md b/_posts/2024-08-06-datafusion-python-udf-comparisons.md index 16faa6f0..78dc0fb9 100644 --- a/_posts/2024-08-06-datafusion-python-udf-comparisons.md +++ b/_posts/2024-08-06-datafusion-python-udf-comparisons.md @@ -253,8 +253,8 @@ deploy the Python wheels you generate. It is certainly not necessary for every u Due to the excellent work by the Python arrow team, we can simplify our work to needing only two dependencies on the Rust side, [arrow-rs](https://github.com/apache/arrow-rs) and [pyo3](https://pyo3.rs/). I have posted a [minimal example](https://github.com/timsaucer/tuple_filter_example). -You’ll need [maturin](https://github.com/PyO3/maturin) to build the project, and I recommend using -release mode when building. +You’ll need [maturin](https://github.com/PyO3/maturin) to build the project, and you must use +release mode when building to get the expected performance. ```bash maturin develop --release @@ -504,7 +504,7 @@ impl TupleFilterDirectIterationClass { We convert the `values_of_interest` into a vector of borrowed types so that we can do a fast search without creating additional memory. The other option is to turn the `returnflag` into a `String` -but that memory allocation is unnecessary. After taht we use two `zip` operations so that we can +but that memory allocation is unnecessary. After that we use two `zip` operations so that we can iterate over all three columns in a single pass. Since each `zip` will return a tuple of two elements, a quick `map` turns them into the tuple format we need. Also, `StringArray` is a little different in the buffer it uses, so it is treated slightly differently from the others. @@ -602,5 +602,5 @@ manner shown above. Lastly, the Apache Arrow and DataFusion community is an active group of very helpful people working to make a great tool. If you want to get involved, please take a look at the -[online documenation](https://datafusion.apache.org/python/) and jump in to help with one of the +[online documentation](https://datafusion.apache.org/python/) and jump in to help with one of the (open issues)[https://github.com/apache/datafusion-python/issues]. From 95a4f8650cf313a394c80a44c2e0463f9a738646 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 12 Aug 2024 21:32:54 -0400 Subject: [PATCH 06/11] Add more thorough description of the problem --- .../2024-08-06-datafusion-python-udf-comparisons.md | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/_posts/2024-08-06-datafusion-python-udf-comparisons.md b/_posts/2024-08-06-datafusion-python-udf-comparisons.md index 78dc0fb9..068dbdb4 100644 --- a/_posts/2024-08-06-datafusion-python-udf-comparisons.md +++ b/_posts/2024-08-06-datafusion-python-udf-comparisons.md @@ -83,10 +83,15 @@ Here are the two example use cases, taken from my own work but generalized. ### Use Case 1: Scalar Function I have a DataFrame and a list of tuples that I’m interested in. I want to filter out the DataFrame -to only have values that match those tuples from certain columns in the DataFrame. For example, -suppose I have a table of sales line items. There are many columns, but I am interested in three: a -part key, supplier key, and return status. I want only to return a DataFrame with a specific -combination of these three values. +to only have values that match those tuples from certain columns in the DataFrame. + +To give a concrete example, we will use data generated for the [TPC-H benchmarks](https://www.tpc.org/tpch/). +Suppose I have a table of sales line items. There are many columns, but I am interested in three: a +part key (`p_partkey`), supplier key (`p_suppkey`), and return status (`p_returnflag`). I want +only to return a DataFrame with a specific combination of these three values. That is, I want +to know if part number 1530 from supplier 4031 was sold (not returned), so I want a specific +combination of `p_partkey = 1530`, `p_suppkey = 4031`, and `p_returnflag = 'N'`. I have a small +handful of these combinations I want to return. Probably the most ergonomic way to do this without UDF is to turn that list of tuples into a DataFrame itself, perform a join, and select the columns from the original DataFrame. If we were From 1fefd6b84541be4d005852b71c2faef4e8a26572 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 19 Nov 2024 09:29:36 -0500 Subject: [PATCH 07/11] Variety of small readability improvements --- ...08-06-datafusion-python-udf-comparisons.md | 64 +++++++++++-------- 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/_posts/2024-08-06-datafusion-python-udf-comparisons.md b/_posts/2024-08-06-datafusion-python-udf-comparisons.md index 068dbdb4..b65e8dbb 100644 --- a/_posts/2024-08-06-datafusion-python-udf-comparisons.md +++ b/_posts/2024-08-06-datafusion-python-udf-comparisons.md @@ -30,8 +30,8 @@ limitations under the License. For a few months now I’ve been working with [Apache DataFusion](https://datafusion.apache.org/), a fast query engine written in Rust. From my experience the language that nearly all data scientists -are working in is Python. In general, often stick to [pandas](https://pandas.pydata.org/) for -in-memory tasks and [pyspark](https://spark.apache.org/) for larger tasks that require distributed +are working in is Python. In general, often stick to [Pandas](https://pandas.pydata.org/) for +in-memory tasks and [PySpark](https://spark.apache.org/) for larger tasks that require distributed processing. In addition to DataFusion, there is another Rust based newcomer to the DataFrame world, @@ -42,7 +42,8 @@ recommend evaluating Polars for in memory work. Personally, I would love a single query approach that is fast for both in memory usage and can extend to large batch processing to exploit parallelization. I think DataFusion, coupled with -Ballista, may provide this solution. +[Ballista](https://datafusion.apache.org/ballista/) or +[DataFusion-Ray](https://github.com/apache/datafusion-ray), may provide this solution. As I’m testing, I’m primarily limiting my work to the [datafusion-python](https://datafusion.apache.org/python/) project, a wrapper around the Rust @@ -65,16 +66,16 @@ functions to achieve your goals. In the following, I’m going to demonstrate two example use cases. These are based on real world problems I’ve encountered. Also I want to demonstrate the approach of “make it work, make it work -well, make it fast” that is a motto I’ve seen thrown around in data science. +well, make it work fast” that is a motto I’ve seen thrown around in data science. I will demonstrate three approaches to writing UDFs. In order of increasing performance they are - Writing a pure Python function to do your computation -- Using the pyarrow libraries in Python to accelerate your processing +- Using the PyArrow libraries in Python to accelerate your processing - Writing a UDF in Rust and exposing it to Python Additionally I will demonstrate two variants of this. The first will be nearly identical to the -pyarrow library approach to simplicity of understanding how to connect the Rust code to Python. The +PyArrow library approach to simplicity of understanding how to connect the Rust code to Python. The second version we will do the iteration through the input arrays ourselves to give even greater flexibility to the user. @@ -95,11 +96,11 @@ handful of these combinations I want to return. Probably the most ergonomic way to do this without UDF is to turn that list of tuples into a DataFrame itself, perform a join, and select the columns from the original DataFrame. If we were -working in pyspark we would probably broadcast join the DataFrame created from the tuple list since +working in PySpark we would probably broadcast join the DataFrame created from the tuple list since it is tiny. In practice, I have found that with some DataFrame libraries performing a filter rather than a join can be significantly faster. This is worth profiling for your specific use case. -### Use Case 2: Aggregate or Window Function +### Use Case 2: Aggregate Function I have a DataFrame with many values that I want to aggregate. I have already analyzed it and determined there is a noise level below which I do not want to include in my analysis. I want to @@ -108,7 +109,7 @@ compute a sum of only values that are above my noise threshold. This can be done fairly easy without leaning on a User Defined Aggegate Function (UDAF). You can simply filter the DataFrame and then aggregate using the built-in `sum` function. Here, we demonstrate doing this as a UDF primarily as an example of how to write UDAFs. We will use the -pyarrow compute approach. +PyArrow compute approach. ## Pure Python approach @@ -158,7 +159,7 @@ df_udf_filter = df_lineitem.filter( When working with a DataFusion UDF in Python, you define your function to take in some number of expressions. During the evaluation, these will get computed into their corresponding values and -passed to your UDF as a pyarrow Array. We must return an Array also with the same number of +passed to your UDF as a PyArrow Array. We must return an Array also with the same number of elements (rows). So the UDF example just iterates through all of the arrays and checks to see if the tuple created from these columns matches any of those that we’re looking for. @@ -176,11 +177,11 @@ code. Any time you have to cross the barrier where you change values inside the Python objects or vice versa you will pay **heavy** cost in that transformation. You will want to design your UDFs to avoid this as much as possible. -## Python approach using pyarrow compute +## Python approach using PyArrow compute DataFusion uses [Apache Arrow](https://arrow.apache.org/) as its in-memory data format. This can be seen in the way that Arrow Arrays are passed into the UDFs. We can take advantage of the fact -that [pyarrow](https://arrow.apache.org/docs/python/), the canonical Python Arrow implementation, +that [PyArrow](https://arrow.apache.org/docs/python/), the canonical Python Arrow implementation, provides a variety of useful functions. In the example below, we are only using a few of the boolean functions and the equality function. Each of these functions takes two arrays and analyzes them row by row. In the @@ -233,10 +234,10 @@ of tuples exists, so we take the result from the current loop and perform a `pya on it. From my benchmarking, switching from approach of converting values into Python objects to this -approach of using the pyarrow built-in functions leads to about a 10x speed improvement in this +approach of using the PyArrow built-in functions leads to about a 10x speed improvement in this simple problem. -It’s worth noting that almost all of the pyarrow compute functions expect to take one or two arrays +It’s worth noting that almost all of the PyArrow compute functions expect to take one or two arrays as their arguments. If you need to write a UDF that is evaluating three or more columns, you’ll need to do something akin to what we’ve shown here. @@ -245,7 +246,7 @@ need to do something akin to what we’ve shown here. This is the most complicated approach, but has the potential to be the most performant. What we will do here is write a Rust function to perform our computation and then expose that function to Python. I know of two use cases where I would recommend this approach. The first is the case when -the pyarrow compute functions are insufficient for your needs. Perhaps your code is too complex or +the PyArrow compute functions are insufficient for your needs. Perhaps your code is too complex or could be greatly simplified if you pulled in some outside dependency. The second use case is when you have written a UDF that you’re sharing across multiple projects and have hardened the approach. It is possible that you can implement your function in Rust to give a speed improvement and then @@ -277,19 +278,22 @@ For the conversion to and from Python objects, we can take advantage of the perform your computation. We are going to demonstrate doing this computation in two ways. The first is to mimic what we’ve -done in the above approach using pyarrow. In the second we demonstrate iterating through the three +done in the above approach using PyArrow. In the second we demonstrate iterating through the three arrays ourselves. In our first approach, we can expect the performance to be nearly identical to when we used the -pyarrow compute functions. On the Rust side we will have slightly less overhead but the heavy +PyArrow compute functions. On the Rust side we will have slightly less overhead but the heavy lifting portions of the code are essentially the same between this Rust implementation and the -pyarrow approach above. +PyArrow approach above. The reason for demonstrating this, even though it doesn’t provide a significant speedup over Python, is to primarily demonstrate how to make the Python to Rust with Python wrapper transition. In the second implementation you can see how we can iterate through all of the arrays ourselves. +In this first example, we are hard coding the values of interest, but in the following section +we demonstrate passing these in during initalization. + ```rust #[pyfunction] pub fn tuple_filter_fn( @@ -527,7 +531,7 @@ batch will contain off the values we are aggregating over. For this we need to d - Return the final result In the example below, we're going to look at customer orders and we want to know per customer ID, -how much they have ordered total. We want to ignore small orders, which we define as anything over +how much they have ordered total. We want to ignore small orders, which we define as anything under 5000. ```python @@ -536,7 +540,7 @@ import pyarrow as pa import pyarrow.compute as pc IGNORE_THESHOLD = 5000.0 -class AboveThersholdAccum(Accumulator): +class AboveThresholdAccum(Accumulator): def __init__(self) -> None: self._sum = 0.0 @@ -556,7 +560,7 @@ class AboveThersholdAccum(Accumulator): def evaluate(self) -> pa.Scalar: return pa.scalar(self._sum) -sum_above_threshold = udaf(AboveThersholdAccum, [pa.float64()], pa.float64(), [pa.float64()], 'stable') +sum_above_threshold = udaf(AboveThresholdAccum, [pa.float64()], pa.float64(), [pa.float64()], 'stable') df_orders.aggregate([col("o_custkey")],[sum_above_threshold(col("o_totalprice")).alias("sales")]).show() ``` @@ -568,6 +572,14 @@ function. For larger batches we may `merge()` these states. It is important to n entirely possible that the `merge` function is significantly different than the `update`, though in our example they are very similar. +## User Defined Window Functions + +Writing a user defined window function is slighlty more complex than an aggregate function due +to the variety of ways that window functions are called. I recommend reviewing the +[online documentation](https://datafusion.apache.org/python/user-guide/common-operations/udf-and-udfa.html) +for a description of which functions need to be implemented. The details of how to implement +these generally follow the same patterns as described above for aggregate functions. + ## Performance Comparison For the scalar functions above, we performed a timing evaluation, repeating the operation 100 @@ -589,9 +601,9 @@ times. For this simple example these are our results. As expected, the conversion to Python objects is by far the worst performance. As soon as we drop into using any functions that keep the data entirely on the Rust side we see a near 10x speed -improvement. Then as we increase our complexity from using pyarrow compute functions to +improvement. Then as we increase our complexity from using PyArrow compute functions to implementing the UDF in Rust we see incremental improvements. Our fastest approach - iterating -through the arrays ourselves does operate nearly 10% faster than the pyarrow compute approach. +through the arrays ourselves does operate nearly 10% faster than the PyArrow compute approach. ## Final Thoughts and Recommendations @@ -600,12 +612,12 @@ giving it a try. This post was designed to make it easier for new users to the P to work with User Defined Functions by giving a few examples of how one might implement these. When it comes to designing UDFs, I strongly recommend seeing if you can write your UDF using -[pyarrow functions](https://arrow.apache.org/docs/python/api/compute.html) rather than pure Python +[PyArrow functions](https://arrow.apache.org/docs/python/api/compute.html) rather than pure Python objects. These will give you enormous speed benefits. If you must do something that isn't well -represented by the pyarrow compute functions, then I would consider using a Rust based UDF in the +represented by the PyArrow compute functions, then I would consider using a Rust based UDF in the manner shown above. Lastly, the Apache Arrow and DataFusion community is an active group of very helpful people working to make a great tool. If you want to get involved, please take a look at the [online documentation](https://datafusion.apache.org/python/) and jump in to help with one of the -(open issues)[https://github.com/apache/datafusion-python/issues]. +[open issues](https://github.com/apache/datafusion-python/issues). From 91b8817d1205a0274eb8dafaa6684f6a51eeceab Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 19 Nov 2024 09:30:29 -0500 Subject: [PATCH 08/11] Update date --- ...isons.md => 2024-11-19-datafusion-python-udf-comparisons.md} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename _posts/{2024-08-06-datafusion-python-udf-comparisons.md => 2024-11-19-datafusion-python-udf-comparisons.md} (99%) diff --git a/_posts/2024-08-06-datafusion-python-udf-comparisons.md b/_posts/2024-11-19-datafusion-python-udf-comparisons.md similarity index 99% rename from _posts/2024-08-06-datafusion-python-udf-comparisons.md rename to _posts/2024-11-19-datafusion-python-udf-comparisons.md index b65e8dbb..f62c861d 100644 --- a/_posts/2024-08-06-datafusion-python-udf-comparisons.md +++ b/_posts/2024-11-19-datafusion-python-udf-comparisons.md @@ -1,7 +1,7 @@ --- layout: post title: "Comparing approaches to User Defined Functions in Apache DataFusion using Python" -date: "2024-08-06 00:00:00" +date: "2024-11-19 00:00:00" author: timsaucer categories: [tutorial] --- From 1b483aaa89282306d2beeb9c9bcf3fa5b8933c7d Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 19 Nov 2024 09:33:13 -0500 Subject: [PATCH 09/11] Small typo --- _posts/2024-11-19-datafusion-python-udf-comparisons.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_posts/2024-11-19-datafusion-python-udf-comparisons.md b/_posts/2024-11-19-datafusion-python-udf-comparisons.md index f62c861d..d71ffc7d 100644 --- a/_posts/2024-11-19-datafusion-python-udf-comparisons.md +++ b/_posts/2024-11-19-datafusion-python-udf-comparisons.md @@ -522,7 +522,7 @@ different in the buffer it uses, so it is treated slightly differently from the Writing a user defined aggregate function or user defined window function is slightly more complex than scalar functions. This is because we must accumulate values and there is no guarantee that one -batch will contain off the values we are aggregating over. For this we need to define an +batch will contain all the values we are aggregating over. For this we need to define an `Accumulator` which will do a few things. - Process a batch and compute an internal state From ca13b631af3d9efc53bd35c4c54a758f4308f35f Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 19 Nov 2024 09:52:43 -0500 Subject: [PATCH 10/11] Small change from in memory to in-memory --- _posts/2024-11-19-datafusion-python-udf-comparisons.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/_posts/2024-11-19-datafusion-python-udf-comparisons.md b/_posts/2024-11-19-datafusion-python-udf-comparisons.md index d71ffc7d..3bb27eb8 100644 --- a/_posts/2024-11-19-datafusion-python-udf-comparisons.md +++ b/_posts/2024-11-19-datafusion-python-udf-comparisons.md @@ -38,9 +38,9 @@ In addition to DataFusion, there is another Rust based newcomer to the DataFrame [Polars](https://pola.rs/). It is growing extremely fast, and it serves many of the same use cases as DataFusion. For my use cases, I'm interested in DataFusion because I want to be able to build small scale tests rapidly and then scale them up to larger distributed systems with ease. I do -recommend evaluating Polars for in memory work. +recommend evaluating Polars for in-memory work. -Personally, I would love a single query approach that is fast for both in memory usage and can +Personally, I would love a single query approach that is fast for both in-memory usage and can extend to large batch processing to exploit parallelization. I think DataFusion, coupled with [Ballista](https://datafusion.apache.org/ballista/) or [DataFusion-Ray](https://github.com/apache/datafusion-ray), may provide this solution. From a0ba495e554445a0a05ed50c0fdf2818fb3824f9 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 19 Nov 2024 12:14:16 -0500 Subject: [PATCH 11/11] Resolve remaining feedback --- ...11-19-datafusion-python-udf-comparisons.md | 45 ++++++++++++------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/_posts/2024-11-19-datafusion-python-udf-comparisons.md b/_posts/2024-11-19-datafusion-python-udf-comparisons.md index 3bb27eb8..7420116a 100644 --- a/_posts/2024-11-19-datafusion-python-udf-comparisons.md +++ b/_posts/2024-11-19-datafusion-python-udf-comparisons.md @@ -30,15 +30,15 @@ limitations under the License. For a few months now I’ve been working with [Apache DataFusion](https://datafusion.apache.org/), a fast query engine written in Rust. From my experience the language that nearly all data scientists -are working in is Python. In general, often stick to [Pandas](https://pandas.pydata.org/) for -in-memory tasks and [PySpark](https://spark.apache.org/) for larger tasks that require distributed -processing. +are working in is Python. In general, data scientists often use [Pandas](https://pandas.pydata.org/) +for in-memory tasks and [PySpark](https://spark.apache.org/) for larger tasks that require +distributed processing. In addition to DataFusion, there is another Rust based newcomer to the DataFrame world, -[Polars](https://pola.rs/). It is growing extremely fast, and it serves many of the same use cases -as DataFusion. For my use cases, I'm interested in DataFusion because I want to be able to build -small scale tests rapidly and then scale them up to larger distributed systems with ease. I do -recommend evaluating Polars for in-memory work. +[Polars](https://pola.rs/). The latter is growing extremely fast, and it serves many of the same +use cases as DataFusion. For my use cases, I'm interested in DataFusion because I want to be able +to build small scale tests rapidly and then scale them up to larger distributed systems with ease. +I do recommend evaluating Polars for in-memory work. Personally, I would love a single query approach that is fast for both in-memory usage and can extend to large batch processing to exploit parallelization. I think DataFusion, coupled with @@ -75,7 +75,7 @@ I will demonstrate three approaches to writing UDFs. In order of increasing perf - Writing a UDF in Rust and exposing it to Python Additionally I will demonstrate two variants of this. The first will be nearly identical to the -PyArrow library approach to simplicity of understanding how to connect the Rust code to Python. The +PyArrow library approach to simplify understanding how to connect the Rust code to Python. In the second version we will do the iteration through the input arrays ourselves to give even greater flexibility to the user. @@ -572,9 +572,14 @@ function. For larger batches we may `merge()` these states. It is important to n entirely possible that the `merge` function is significantly different than the `update`, though in our example they are very similar. +One example of implementing a user defined aggregate function where the `update()` and `merge()` +operations are different is computing an average. In `update()` we would create a state that is both +a sum and a count. `state()` would return a list of these two values, and `merge()` would compute +the final result. + ## User Defined Window Functions -Writing a user defined window function is slighlty more complex than an aggregate function due +Writing a user defined window function is slightly more complex than an aggregate function due to the variety of ways that window functions are called. I recommend reviewing the [online documentation](https://datafusion.apache.org/python/user-guide/common-operations/udf-and-udfa.html) for a description of which functions need to be implemented. The details of how to implement @@ -600,9 +605,9 @@ times. For this simple example these are our results. ``` As expected, the conversion to Python objects is by far the worst performance. As soon as we drop -into using any functions that keep the data entirely on the Rust side we see a near 10x speed -improvement. Then as we increase our complexity from using PyArrow compute functions to -implementing the UDF in Rust we see incremental improvements. Our fastest approach - iterating +into using any functions that keep the data entirely on the Native (Rust or C/C++) side we see a +near 10x speed improvement. Then as we increase our complexity from using PyArrow compute functions +to implementing the UDF in Rust we see incremental improvements. Our fastest approach - iterating through the arrays ourselves does operate nearly 10% faster than the PyArrow compute approach. ## Final Thoughts and Recommendations @@ -613,11 +618,21 @@ to work with User Defined Functions by giving a few examples of how one might im When it comes to designing UDFs, I strongly recommend seeing if you can write your UDF using [PyArrow functions](https://arrow.apache.org/docs/python/api/compute.html) rather than pure Python -objects. These will give you enormous speed benefits. If you must do something that isn't well -represented by the PyArrow compute functions, then I would consider using a Rust based UDF in the -manner shown above. +objects. As shown in the scalar example above, you can achieve a 10x speedup by using PyArrow +functions. If you must do something that isn't well represented by the PyArrow compute functions, +then I would consider using a Rust based UDF in the manner shown above. + +I would like to thank [@alamb], [@andygrove], [@comphead], [@emgeee], [@kylebarron], and [@Omega359] +for their helpful reviews and feedback. Lastly, the Apache Arrow and DataFusion community is an active group of very helpful people working to make a great tool. If you want to get involved, please take a look at the [online documentation](https://datafusion.apache.org/python/) and jump in to help with one of the [open issues](https://github.com/apache/datafusion-python/issues). + +[@kylebarron]: https://github.com/kylebarron +[@emgeee]: https://github.com/emgeee +[@Omega359]: https://github.com/Omega359 +[@comphead]: https://github.com/comphead +[@andygrove]: https://github.com/andygrove +[@alamb]: https://github.com/alamb