diff --git a/docs/source/python/api/compute.rst b/docs/source/python/api/compute.rst index 4a9208fd31b..574bcbc659f 100644 --- a/docs/source/python/api/compute.rst +++ b/docs/source/python/api/compute.rst @@ -555,3 +555,12 @@ Compute Options TrimOptions VarianceOptions WeekOptions + +User-Defined Functions +---------------------- + +.. autosummary:: + :toctree: ../generated/ + + register_scalar_function + ScalarUdfContext diff --git a/docs/source/python/compute.rst b/docs/source/python/compute.rst index bcbca9dff36..fe7f333300f 100644 --- a/docs/source/python/compute.rst +++ b/docs/source/python/compute.rst @@ -370,3 +370,135 @@ our ``even_filter`` with a ``pc.field("nums") > 5`` filter: :class:`.Dataset` currently can be filtered using :meth:`.Dataset.to_table` method passing a ``filter`` argument. See :ref:`py-filter-dataset` in Dataset documentation. + + +User-Defined Functions +====================== + +.. warning:: + This API is **experimental**. + +PyArrow allows defining and registering custom compute functions. +These functions can then be called from Python as well as C++ (and potentially +any other implementation wrapping Arrow C++, such as the R ``arrow`` package) +using their registered function name. + +UDF support is limited to scalar functions. A scalar function is a function which +executes elementwise operations on arrays or scalars. In general, the output of a +scalar function does not depend on the order of values in the arguments. Note that +such functions have a rough correspondence to the functions used in SQL expressions, +or to NumPy `universal functions `_. + +To register a UDF, a function name, function docs, input types and +output type need to be defined. Using :func:`pyarrow.compute.register_scalar_function`, + +.. code-block:: python + + import numpy as np + + import pyarrow as pa + import pyarrow.compute as pc + + function_name = "numpy_gcd" + function_docs = { + "summary": "Calculates the greatest common divisor", + "description": + "Given 'x' and 'y' find the greatest number that divides\n" + "evenly into both x and y." + } + + input_types = { + "x" : pa.int64(), + "y" : pa.int64() + } + + output_type = pa.int64() + + def to_np(val): + if isinstance(val, pa.Scalar): + return val.as_py() + else: + return np.array(val) + + def gcd_numpy(ctx, x, y): + np_x = to_np(x) + np_y = to_np(y) + return pa.array(np.gcd(np_x, np_y)) + + pc.register_scalar_function(gcd_numpy, + function_name, + function_docs, + input_types, + output_type) + + +The implementation of a user-defined function always takes a first *context* +parameter (named ``ctx`` in the example above) which is an instance of +:class:`pyarrow.compute.ScalarUdfContext`. +This context exposes several useful attributes, particularly a +:attr:`~pyarrow.compute.ScalarUdfContext.memory_pool` to be used for +allocations in the context of the user-defined function. + +You can call a user-defined function directly using :func:`pyarrow.compute.call_function`: + +.. code-block:: python + + >>> pc.call_function("numpy_gcd", [pa.scalar(27), pa.scalar(63)]) + + >>> pc.call_function("numpy_gcd", [pa.scalar(27), pa.array([81, 12, 5])]) + + [ + 27, + 3, + 1 + ] + +Working with Datasets +--------------------- + +More generally, user-defined functions are usable everywhere a compute function +can be referred by its name. For example, they can be called on a dataset's +column using :meth:`Expression._call`. + +Consider an instance where the data is in a table and we want to compute +the GCD of one column with the scalar value 30. We will be re-using the +"numpy_gcd" user-defined function that was created above: + +.. code-block:: python + + >>> import pyarrow.dataset as ds + >>> data_table = pa.table({'category': ['A', 'B', 'C', 'D'], 'value': [90, 630, 1827, 2709]}) + >>> dataset = ds.dataset(data_table) + >>> func_args = [pc.scalar(30), ds.field("value")] + >>> dataset.to_table( + ... columns={ + ... 'gcd_value': ds.field('')._call("numpy_gcd", func_args), + ... 'value': ds.field('value'), + ... 'category': ds.field('category') + ... }) + pyarrow.Table + gcd_value: int64 + value: int64 + category: string + ---- + gcd_value: [[30,30,3,3]] + value: [[90,630,1827,2709]] + category: [["A","B","C","D"]] + +Note that ``ds.field('')_call(...)`` returns a :func:`pyarrow.compute.Expression`. +The arguments passed to this function call are expressions, not scalar values +(notice the difference between :func:`pyarrow.scalar` and :func:`pyarrow.compute.scalar`, +the latter produces an expression). +This expression is evaluated when the projection operator executes it. + +Projection Expressions +^^^^^^^^^^^^^^^^^^^^^^ +In the above example we used an expression to add a new column (``gcd_value``) +to our table. Adding new, dynamically computed, columns to a table is known as "projection" +and there are limitations on what kinds of functions can be used in projection expressions. +A projection function must emit a single output value for each input row. That output value +should be calculated entirely from the input row and should not depend on any other row. +For example, the "numpy_gcd" function that we've been using as an example above is a valid +function to use in a projection. A "cumulative sum" function would not be a valid function +since the result of each input row depends on the rows that came before. A "drop nulls" +function would also be invalid because it doesn't emit a value for some rows. diff --git a/python/pyarrow/src/udf.h b/python/pyarrow/src/udf.h index 52f22b4cb4f..a110440315a 100644 --- a/python/pyarrow/src/udf.h +++ b/python/pyarrow/src/udf.h @@ -41,6 +41,7 @@ struct ARROW_PYTHON_EXPORT ScalarUdfOptions { std::shared_ptr output_type; }; +/// \brief A context passed as the first argument of scalar UDF functions. struct ARROW_PYTHON_EXPORT ScalarUdfContext { MemoryPool* pool; int64_t batch_length;