Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cpp/src/arrow/type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,16 @@ std::string TypeHolder::ToString(const std::vector<TypeHolder>& types) {
return ss.str();
}

std::vector<TypeHolder> TypeHolder::FromTypes(
const std::vector<std::shared_ptr<DataType>>& types) {
std::vector<TypeHolder> type_holders;
type_holders.reserve(types.size());
for (const auto& type : types) {
type_holders.emplace_back(type);
}
return type_holders;
}

// ----------------------------------------------------------------------

FloatingPointType::Precision HalfFloatType::precision() const {
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ struct ARROW_EXPORT TypeHolder {
}

static std::string ToString(const std::vector<TypeHolder>&);

static std::vector<TypeHolder> FromTypes(
const std::vector<std::shared_ptr<DataType>>& types);
};

ARROW_EXPORT
Expand Down
186 changes: 180 additions & 6 deletions python/pyarrow/_compute.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ import inspect
import numpy as np


def _forbid_instantiation(klass, subclasses_instead=True):
msg = '{} is an abstract class thus cannot be initialized.'.format(
klass.__name__
)
if subclasses_instead:
subclasses = [cls.__name__ for cls in klass.__subclasses__]
msg += ' Use one of the subclasses instead: {}'.format(
', '.join(subclasses)
)
raise TypeError(msg)


cdef wrap_scalar_function(const shared_ptr[CFunction]& sp_func):
"""
Wrap a C++ scalar Function in a ScalarFunction object.
Expand Down Expand Up @@ -2574,7 +2586,7 @@ cdef object box_scalar_udf_context(const CScalarUdfContext& c_context):
return context


cdef _scalar_udf_callback(user_function, const CScalarUdfContext& c_context, inputs):
cdef _udf_callback(user_function, const CScalarUdfContext& c_context, inputs):
"""
Helper callback function used to wrap the ScalarUdfContext from Python to C++
execution.
Expand All @@ -2591,8 +2603,30 @@ def _get_scalar_udf_context(memory_pool, batch_length):
return context


def register_scalar_function(func, function_name, function_doc, in_types,
out_type):
ctypedef CStatus (*CRegisterUdf)(PyObject* function, function[CallbackUdf] wrapper,
const CUdfOptions& options, CFunctionRegistry* registry)

cdef class RegisterUdf(_Weakrefable):
cdef CRegisterUdf register_func

cdef void init(self, const CRegisterUdf register_func):
self.register_func = register_func


cdef get_register_scalar_function():
cdef RegisterUdf reg = RegisterUdf.__new__(RegisterUdf)
reg.register_func = RegisterScalarFunction
return reg


cdef get_register_tabular_function():
cdef RegisterUdf reg = RegisterUdf.__new__(RegisterUdf)
reg.register_func = RegisterTabularFunction
return reg

Comment on lines +2606 to +2626
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

appreciate your effort to make this more generic, but with the other UDFs, we have a complex strucutre for the callbacks. In that case this generalization won't be that useful as far as I feel. For the moment, shall we keep them separate? There would be a sort of DRY violated here, but once we generalized this API after our experimental version, we should be able to streamline these.

cc @westonpace

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean revert to the original register_scalar_udf code, then duplicate it and adapt to register_tabular_udf? The disadvantage of code repetition is clear, but what is the advantage? The remaining PR would still need to resolve the conflict, since you say the callback structure changed in that PR, only this time the conflict would not be caught by source control tools, because the register_scalar_udf code would not have changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe I didn't clearly state it in the description, do we need this interface RegisterUdf? Is it a must? I was referring to this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proposed common design for scalar and tabular function registration requires a parameter to distinguish between the two. I considered 3 options to do so:

  1. Using a Boolean parameter, such as is_tabular. This seemed the least elegant to me, and is definitely the least extensible.
  2. Using and enum parameter, which would currently have two values for scalar and tabular. This is better but seemed undesirable because it is not clear what this enum is. Is it the function's kind? the function output's kind? something else? See also this post.
  3. Using an interface parameter. This seemed elegant and extensible to me. The interface just describes how the function is registered, and is easy enough to implement. At least for the time being, the intention is for this interface to be internal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference would be to keep this and, if we have something better when we add aggregate udfs, we can switch to that then. That being said, I'm not 100% certain I follow the arguments here.


def register_scalar_function(func, function_name, function_doc, in_types, out_type,
func_registry=None):
"""
Register a user-defined scalar function.

Expand Down Expand Up @@ -2633,6 +2667,8 @@ def register_scalar_function(func, function_name, function_doc, in_types,
arity.
out_type : DataType
Output type of the function.
func_registry : FunctionRegistry
Optional function registry to use instead of the default global one.

Examples
--------
Expand Down Expand Up @@ -2662,14 +2698,106 @@ def register_scalar_function(func, function_name, function_doc, in_types,
21
]
"""
return _register_scalar_like_function(get_register_scalar_function(),
func, function_name, function_doc, in_types,
out_type, func_registry)


def register_tabular_function(func, function_name, function_doc, in_types, out_type,
func_registry=None):
"""
Register a user-defined tabular function.

A tabular function is one accepting a context argument of type
ScalarUdfContext and returning a generator of struct arrays.
The in_types argument must be empty and the out_type argument
specifies a schema. Each struct array must have field types
correspoding to the schema.

Parameters
----------
func : callable
A callable implementing the user-defined function.
The only argument is the context argument of type
ScalarUdfContext. It must return a callable that
returns on each invocation a StructArray matching
the out_type, where an empty array indicates end.
function_name : str
Name of the function. This name must be globally unique.
function_doc : dict
A dictionary object with keys "summary" (str),
and "description" (str).
in_types : Dict[str, DataType]
Must be an empty dictionary (reserved for future use).
out_type : Union[Schema, DataType]
Schema of the function's output, or a corresponding flat struct type.
func_registry : FunctionRegistry
Optional function registry to use instead of the default global one.
"""
cdef:
shared_ptr[CSchema] c_schema
shared_ptr[CDataType] c_type

if isinstance(out_type, Schema):
c_schema = pyarrow_unwrap_schema(out_type)
with nogil:
c_type = <shared_ptr[CDataType]>make_shared[CStructType](deref(c_schema).fields())
out_type = pyarrow_wrap_data_type(c_type)
return _register_scalar_like_function(get_register_tabular_function(),
func, function_name, function_doc, in_types,
out_type, func_registry)


def _register_scalar_like_function(register_func, func, function_name, function_doc, in_types,
out_type, func_registry=None):
"""
Register a user-defined scalar-like function.

A scalar-like function is a callable accepting a first
context argument of type ScalarUdfContext as well as
possibly additional Arrow arguments, and returning a
an Arrow result appropriate for the kind of function.
A scalar function and a tabular function are examples
for scalar-like functions.
This function is normally not called directly but via
register_scalar_function or register_tabular_function.

Parameters
----------
register_func: object
An object holding a CRegisterUdf in a "register_func" attribute. Use
get_register_scalar_function() for a scalar function and
get_register_tabular_function() for a tabular function.
func : callable
A callable implementing the user-defined function.
See register_scalar_function and
register_tabular_function for details.

function_name : str
Name of the function. This name must be globally unique.
function_doc : dict
A dictionary object with keys "summary" (str),
and "description" (str).
in_types : Dict[str, DataType]
A dictionary mapping function argument names to
their respective DataType.
See register_scalar_function and
register_tabular_function for details.
out_type : DataType
Output type of the function.
func_registry : FunctionRegistry
Optional function registry to use instead of the default global one.
"""
cdef:
CRegisterUdf c_register_func
c_string c_func_name
CArity c_arity
CFunctionDoc c_func_doc
vector[shared_ptr[CDataType]] c_in_types
PyObject* c_function
shared_ptr[CDataType] c_out_type
CScalarUdfOptions c_options
CUdfOptions c_options
CFunctionRegistry* c_func_registry

if callable(func):
c_function = <PyObject*>func
Expand Down Expand Up @@ -2711,5 +2839,51 @@ def register_scalar_function(func, function_name, function_doc, in_types,
c_options.input_types = c_in_types
c_options.output_type = c_out_type

check_status(RegisterScalarFunction(c_function,
<function[CallbackUdf]> &_scalar_udf_callback, c_options))
if func_registry is None:
c_func_registry = NULL
else:
c_func_registry = (<FunctionRegistry>func_registry).registry

c_register_func = (<RegisterUdf>register_func).register_func

check_status(c_register_func(c_function,
<function[CallbackUdf]> &_udf_callback,
c_options, c_func_registry))


def call_tabular_function(function_name, args=None, func_registry=None):
"""
Get a record batch iterator from a tabular function.

Parameters
----------
function_name : str
Name of the function.
args : iterable
The arguments to pass to the function. Accepted types depend
on the specific function. Currently, only an empty args is supported.
func_registry : FunctionRegistry
Optional function registry to use instead of the default global one.
"""
cdef:
c_string c_func_name
vector[CDatum] c_args
CFunctionRegistry* c_func_registry
shared_ptr[CRecordBatchReader] c_reader
RecordBatchReader reader

c_func_name = tobytes(function_name)
if func_registry is None:
c_func_registry = NULL
else:
c_func_registry = (<FunctionRegistry>func_registry).registry
if args is None:
args = []
_pack_compute_args(args, &c_args)

with nogil:
c_reader = GetResultValue(CallTabularFunction(
c_func_name, c_args, c_func_registry))
reader = RecordBatchReader.__new__(RecordBatchReader)
reader.reader = c_reader
return RecordBatchReader.from_batches(pyarrow_wrap_schema(deref(c_reader).schema()), reader)
13 changes: 1 addition & 12 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,13 @@ from pyarrow.lib cimport *
from pyarrow.lib import ArrowTypeError, frombytes, tobytes, _pc
from pyarrow.includes.libarrow_dataset cimport *
from pyarrow._compute cimport Expression, _bind
from pyarrow._compute import _forbid_instantiation
from pyarrow._fs cimport FileSystem, FileInfo, FileSelector
from pyarrow._csv cimport (
ConvertOptions, ParseOptions, ReadOptions, WriteOptions)
from pyarrow.util import _is_iterable, _is_path_like, _stringify_path


def _forbid_instantiation(klass, subclasses_instead=True):
msg = '{} is an abstract class thus cannot be initialized.'.format(
klass.__name__
)
if subclasses_instead:
subclasses = [cls.__name__ for cls in klass.__subclasses__]
msg += ' Use one of the subclasses instead: {}'.format(
', '.join(subclasses)
)
raise TypeError(msg)


_orc_fileformat = None
_orc_imported = False

Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@
list_functions,
_group_by,
# Udf
call_tabular_function,
register_scalar_function,
register_tabular_function,
ScalarUdfContext,
# Expressions
Expression,
Expand Down
25 changes: 22 additions & 3 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
vector[shared_ptr[CField]] GetAllFieldsByName(const c_string& name)
int GetFieldIndex(const c_string& name)
vector[int] GetAllFieldIndices(const c_string& name)
const vector[shared_ptr[CField]] fields()
int num_fields()
c_string ToString()

Expand Down Expand Up @@ -800,6 +801,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
const shared_ptr[CSchema]& schema, int64_t num_rows,
const vector[shared_ptr[CArray]]& columns)

CResult[shared_ptr[CStructArray]] ToStructArray() const

@staticmethod
CResult[shared_ptr[CRecordBatch]] FromStructArray(
const shared_ptr[CArray]& array)
Expand Down Expand Up @@ -2805,17 +2808,33 @@ cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil:

ctypedef PyObject* CallbackUdf(object user_function, const CScalarUdfContext& context, object inputs)

cdef extern from "arrow/python/udf.h" namespace "arrow::py":

cdef extern from "arrow/api.h" namespace "arrow" nogil:

cdef cppclass CRecordBatchIterator "arrow::RecordBatchIterator"(
CIterator[shared_ptr[CRecordBatch]]):
pass


cdef extern from "arrow/python/udf.h" namespace "arrow::py" nogil:
cdef cppclass CScalarUdfContext" arrow::py::ScalarUdfContext":
CMemoryPool *pool
int64_t batch_length

cdef cppclass CScalarUdfOptions" arrow::py::ScalarUdfOptions":
cdef cppclass CUdfOptions" arrow::py::UdfOptions":
c_string func_name
CArity arity
CFunctionDoc func_doc
vector[shared_ptr[CDataType]] input_types
shared_ptr[CDataType] output_type

CStatus RegisterScalarFunction(PyObject* function,
function[CallbackUdf] wrapper, const CScalarUdfOptions& options)
function[CallbackUdf] wrapper, const CUdfOptions& options,
CFunctionRegistry* registry)

CStatus RegisterTabularFunction(PyObject* function,
function[CallbackUdf] wrapper, const CUdfOptions& options,
CFunctionRegistry* registry)

CResult[shared_ptr[CRecordBatchReader]] CallTabularFunction(
const c_string& func_name, const vector[CDatum]& args, CFunctionRegistry* registry)
7 changes: 0 additions & 7 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,6 @@ from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_fs cimport *


cdef extern from "arrow/api.h" namespace "arrow" nogil:

cdef cppclass CRecordBatchIterator "arrow::RecordBatchIterator"(
CIterator[shared_ptr[CRecordBatch]]):
pass


cdef extern from "arrow/dataset/plan.h" namespace "arrow::dataset::internal" nogil:

cdef void Initialize()
Expand Down
Loading