Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f54fd1c
updating submodule
vibhatha Mar 4, 2022
98cd453
temp commit to remove files in submodule
vibhatha Mar 19, 2022
a72cf54
adding submodule
vibhatha Mar 19, 2022
7c0d11e
updating testing submodule
vibhatha Mar 20, 2022
25862d8
revert to uupstream version
vibhatha Mar 20, 2022
06e06e8
adding initial scalar udf docs for python
vibhatha Jul 22, 2022
79eedd8
update warnings
vibhatha Jul 22, 2022
88d46cc
update udf description
vibhatha Jul 22, 2022
0497de0
grammar fix
vibhatha Jul 22, 2022
100ad56
update udf function doc
vibhatha Jul 22, 2022
b0221c9
update udf func doc description
vibhatha Jul 22, 2022
eec2473
update description
vibhatha Jul 22, 2022
c48c90a
update the how the udf can be called with datasets.
vibhatha Jul 22, 2022
60abbc6
update info on context
vibhatha Jul 22, 2022
d803a4f
adding addequate memory
vibhatha Jul 22, 2022
b4959a7
adding a new example to show case the scalar limitations'
vibhatha Jul 22, 2022
93323d9
updating example and rebase
vibhatha Jul 22, 2022
feb6bb6
adding docs
vibhatha Jul 22, 2022
1203d80
addressing review comments
vibhatha Jul 23, 2022
6a04e09
fix(reviews): address reviews
vibhatha Jul 28, 2022
f702c51
fix(reviews): updated write up and example
vibhatha Aug 26, 2022
e86fadd
fix(reviews): replaced affine function with gcd and update docs
vibhatha Sep 16, 2022
4aa2902
fix(reviews): updated review comments
vibhatha Sep 22, 2022
7e88faa
fix(reviews): updated the docs and code
vibhatha Sep 24, 2022
227db70
fix(submodule)
vibhatha Sep 28, 2022
a8555bf
fix(reviews): doc update
vibhatha Sep 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/source/python/api/compute.rst
Original file line number Diff line number Diff line change
Expand Up @@ -555,3 +555,12 @@ Compute Options
TrimOptions
VarianceOptions
WeekOptions

User-Defined Functions
----------------------

.. autosummary::
:toctree: ../generated/

register_scalar_function
ScalarUdfContext
132 changes: 132 additions & 0 deletions docs/source/python/compute.rst
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,135 @@ our ``even_filter`` with a ``pc.field("nums") > 5`` filter:

:class:`.Dataset` currently can be filtered using :meth:`.Dataset.to_table` method
passing a ``filter`` argument. See :ref:`py-filter-dataset` in Dataset documentation.


User-Defined Functions
======================

.. warning::
This API is **experimental**.

PyArrow allows defining and registering custom compute functions.
These functions can then be called from Python as well as C++ (and potentially
any other implementation wrapping Arrow C++, such as the R ``arrow`` package)
using their registered function name.

UDF support is limited to scalar functions. A scalar function is a function which
executes elementwise operations on arrays or scalars. In general, the output of a
scalar function does not depend on the order of values in the arguments. Note that
such functions have a rough correspondence to the functions used in SQL expressions,
or to NumPy `universal functions <https://numpy.org/doc/stable/reference/ufuncs.html>`_.

To register a UDF, a function name, function docs, input types and
output type need to be defined. Using :func:`pyarrow.compute.register_scalar_function`,

.. code-block:: python

import numpy as np

import pyarrow as pa
import pyarrow.compute as pc

function_name = "numpy_gcd"
function_docs = {
"summary": "Calculates the greatest common divisor",
"description":
"Given 'x' and 'y' find the greatest number that divides\n"
"evenly into both x and y."
}

input_types = {
"x" : pa.int64(),
"y" : pa.int64()
}

output_type = pa.int64()

def to_np(val):
if isinstance(val, pa.Scalar):
return val.as_py()
else:
return np.array(val)

def gcd_numpy(ctx, x, y):
np_x = to_np(x)
np_y = to_np(y)
return pa.array(np.gcd(np_x, np_y))

pc.register_scalar_function(gcd_numpy,
function_name,
function_docs,
input_types,
output_type)


The implementation of a user-defined function always takes a first *context*
parameter (named ``ctx`` in the example above) which is an instance of
:class:`pyarrow.compute.ScalarUdfContext`.
This context exposes several useful attributes, particularly a
:attr:`~pyarrow.compute.ScalarUdfContext.memory_pool` to be used for
allocations in the context of the user-defined function.

You can call a user-defined function directly using :func:`pyarrow.compute.call_function`:

.. code-block:: python

>>> pc.call_function("numpy_gcd", [pa.scalar(27), pa.scalar(63)])
<pyarrow.Int64Scalar: 9>
>>> pc.call_function("numpy_gcd", [pa.scalar(27), pa.array([81, 12, 5])])
<pyarrow.lib.Int64Array object at 0x7fcfa0e7b100>
[
27,
3,
1
]

Working with Datasets
---------------------

More generally, user-defined functions are usable everywhere a compute function
can be referred by its name. For example, they can be called on a dataset's
column using :meth:`Expression._call`.

Consider an instance where the data is in a table and we want to compute
the GCD of one column with the scalar value 30. We will be re-using the
"numpy_gcd" user-defined function that was created above:

.. code-block:: python

>>> import pyarrow.dataset as ds
>>> data_table = pa.table({'category': ['A', 'B', 'C', 'D'], 'value': [90, 630, 1827, 2709]})
>>> dataset = ds.dataset(data_table)
>>> func_args = [pc.scalar(30), ds.field("value")]
>>> dataset.to_table(
... columns={
... 'gcd_value': ds.field('')._call("numpy_gcd", func_args),
... 'value': ds.field('value'),
... 'category': ds.field('category')
... })
pyarrow.Table
gcd_value: int64
value: int64
category: string
----
gcd_value: [[30,30,3,3]]
value: [[90,630,1827,2709]]
category: [["A","B","C","D"]]

Note that ``ds.field('')_call(...)`` returns a :func:`pyarrow.compute.Expression`.
The arguments passed to this function call are expressions, not scalar values
(notice the difference between :func:`pyarrow.scalar` and :func:`pyarrow.compute.scalar`,
the latter produces an expression).
This expression is evaluated when the projection operator executes it.

Projection Expressions
^^^^^^^^^^^^^^^^^^^^^^
In the above example we used an expression to add a new column (``gcd_value``)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this paragraph describing the definition of a scalar function? Or is that different. Either way, I think we want to say that explicitly ("This is what it means to be a scalar function" / "These requirements are distinct from the definition of a scalar function".).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah not exactly. But it was added as a sub section to give clarity on what projection expression does and how it can be used with UDFs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also mention somewhere (more to the beginning of the new section, I think), that currently the UDFs are limited to scalar functions (and then also explain what a scalar function is).
That will also make it easier to refer to that concept here to say that projections currently only support scalar functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that make sense. Will update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a section the limitations and scalar function definition. I merely rephrased what is the docs to be precise. Didn't want introduce anything extra.

to our table. Adding new, dynamically computed, columns to a table is known as "projection"
and there are limitations on what kinds of functions can be used in projection expressions.
A projection function must emit a single output value for each input row. That output value
should be calculated entirely from the input row and should not depend on any other row.
For example, the "numpy_gcd" function that we've been using as an example above is a valid
function to use in a projection. A "cumulative sum" function would not be a valid function
since the result of each input row depends on the rows that came before. A "drop nulls"
function would also be invalid because it doesn't emit a value for some rows.
1 change: 1 addition & 0 deletions python/pyarrow/src/udf.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ struct ARROW_PYTHON_EXPORT ScalarUdfOptions {
std::shared_ptr<DataType> output_type;
};

/// \brief A context passed as the first argument of scalar UDF functions.
struct ARROW_PYTHON_EXPORT ScalarUdfContext {
MemoryPool* pool;
int64_t batch_length;
Expand Down