From 57ccea0ec86793b8d85ebf164b0542ae3066d9e7 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 10 Sep 2020 18:22:11 +0200 Subject: [PATCH 1/4] ARROW-9465: [Python] Improve ergonomics of compute module Lots of assorted things here: * Automatically generate global wrappers for calling registered compute functions * Improve metadata of such wrappers (e.g. name, docstring) * Make it easier to pass options (for example via kwargs) * Type-check options * Add some docstrings * Expose more function attributes (e.g. arity) * Fix some crashes --- cpp/src/arrow/compute/api_aggregate.h | 2 +- cpp/src/arrow/compute/exec.cc | 3 - cpp/src/arrow/compute/function.cc | 3 + cpp/src/arrow/compute/function.h | 6 +- cpp/src/arrow/compute/kernel.cc | 5 + cpp/src/arrow/compute/kernel_test.cc | 9 + .../kernels/aggregate_basic_internal.h | 6 +- .../arrow/compute/kernels/aggregate_test.cc | 6 +- docs/source/cpp/compute.rst | 2 +- python/pyarrow/_compute.pyx | 171 ++++++++++++- python/pyarrow/compute.py | 229 ++++++++++++------ python/pyarrow/includes/libarrow.pxd | 11 + python/pyarrow/tests/test_compute.py | 171 +++++++++++-- r/src/compute.cpp | 2 +- 14 files changed, 510 insertions(+), 116 deletions(-) diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h index 675bd25a684..b595e0ec733 100644 --- a/cpp/src/arrow/compute/api_aggregate.h +++ b/cpp/src/arrow/compute/api_aggregate.h @@ -66,7 +66,7 @@ struct ARROW_EXPORT MinMaxOptions : public FunctionOptions { /// Skip null values SKIP = 0, /// Any nulls will result in null output - OUTPUT_NULL + EMIT_NULL }; explicit MinMaxOptions(enum Mode null_handling = SKIP) : null_handling(null_handling) {} diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index fbc3693e5be..435d7dd74b2 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -952,9 +952,6 @@ Result CallFunction(const std::string& func_name, const std::vector func, ctx->func_registry()->GetFunction(func_name)); - if (options == nullptr) { - options = func->default_options(); - } return func->Execute(args, options, ctx); } diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 41c3e360a07..6205274cc96 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -103,6 +103,9 @@ Result DispatchExactImpl(const Function& func, Result Function::Execute(const std::vector& args, const FunctionOptions* options, ExecContext* ctx) const { + if (options == nullptr) { + options = default_options(); + } if (ctx == nullptr) { ExecContext default_ctx; return Execute(args, options, &default_ctx); diff --git a/cpp/src/arrow/compute/function.h b/cpp/src/arrow/compute/function.h index 93a200ee212..0b800ade6c9 100644 --- a/cpp/src/arrow/compute/function.h +++ b/cpp/src/arrow/compute/function.h @@ -65,7 +65,8 @@ struct ARROW_EXPORT Arity { /// invoking the function static Arity VarArgs(int min_args = 0) { return Arity(min_args, true); } - explicit Arity(int num_args, bool is_varargs = false) + // NOTE: the 0-argument form (default constructor) is required for Cython + explicit Arity(int num_args = 0, bool is_varargs = false) : num_args(num_args), is_varargs(is_varargs) {} /// The number of required arguments (or the minimum number for varargs @@ -124,8 +125,7 @@ class ARROW_EXPORT Function { /// kernel dispatch, batch iteration, and memory allocation details taken /// care of. /// - /// Function implementations may assume that options is non-null and valid - /// or to forgo options and accept only nullptr for that argument. + /// If the `options` pointer is null, then `default_options()` will be used. /// /// This function can be overridden in subclasses. virtual Result Execute(const std::vector& args, diff --git a/cpp/src/arrow/compute/kernel.cc b/cpp/src/arrow/compute/kernel.cc index 1788eb72963..88b42716fa2 100644 --- a/cpp/src/arrow/compute/kernel.cc +++ b/cpp/src/arrow/compute/kernel.cc @@ -281,6 +281,9 @@ std::string InputType::ToString() const { } ss << "["; switch (kind_) { + case InputType::ANY_TYPE: + ss << "any"; + break; case InputType::EXACT_TYPE: ss << type_->ToString(); break; @@ -303,6 +306,8 @@ bool InputType::Equals(const InputType& other) const { return false; } switch (kind_) { + case InputType::ANY_TYPE: + return true; case InputType::EXACT_TYPE: return type_->Equals(*other.type_); case InputType::USE_TYPE_MATCHER: diff --git a/cpp/src/arrow/compute/kernel_test.cc b/cpp/src/arrow/compute/kernel_test.cc index 2eb7fd11449..df18fceaa20 100644 --- a/cpp/src/arrow/compute/kernel_test.cc +++ b/cpp/src/arrow/compute/kernel_test.cc @@ -141,6 +141,15 @@ TEST(InputType, Constructors) { InputType ty7(match::TimestampTypeUnit(TimeUnit::MICRO)); ASSERT_EQ("any[timestamp(us)]", ty7.ToString()); + + InputType ty8; + InputType ty9(ValueDescr::ANY); + InputType ty10(ValueDescr::ARRAY); + InputType ty11(ValueDescr::SCALAR); + ASSERT_EQ("any[any]", ty8.ToString()); + ASSERT_EQ("any[any]", ty9.ToString()); + ASSERT_EQ("array[any]", ty10.ToString()); + ASSERT_EQ("scalar[any]", ty11.ToString()); } TEST(InputType, Equals) { diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h index 0d8d43d95d6..3776b20964e 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h @@ -418,7 +418,7 @@ struct MinMaxImpl : public ScalarAggregator { local.has_nulls = null_count > 0; local.has_values = (arr.length() - null_count) > 0; - if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) { + if (local.has_nulls && options.null_handling == MinMaxOptions::EMIT_NULL) { this->state = local; return; } @@ -443,7 +443,7 @@ struct MinMaxImpl : public ScalarAggregator { std::vector> values; if (!state.has_values || - (state.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL)) { + (state.has_nulls && options.null_handling == MinMaxOptions::EMIT_NULL)) { // (null, null) values = {std::make_shared(), std::make_shared()}; } else { @@ -533,7 +533,7 @@ struct BooleanMinMaxImpl : public MinMaxImpl { local.has_nulls = null_count > 0; local.has_values = valid_count > 0; - if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) { + if (local.has_nulls && options.null_handling == MinMaxOptions::EMIT_NULL) { this->state = local; return; } diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index b1ebd861490..942691e9aee 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -513,7 +513,7 @@ TEST_F(TestBooleanMinMaxKernel, Basics) { this->AssertMinMaxIs(chunked_input2, false, false, options); this->AssertMinMaxIs(chunked_input3, false, true, options); - options = MinMaxOptions(MinMaxOptions::OUTPUT_NULL); + options = MinMaxOptions(MinMaxOptions::EMIT_NULL); this->AssertMinMaxIsNull("[]", options); this->AssertMinMaxIsNull("[null, null, null]", options); this->AssertMinMaxIsNull("[false, null, false]", options); @@ -543,7 +543,7 @@ TYPED_TEST(TestIntegerMinMaxKernel, Basics) { this->AssertMinMaxIs(chunked_input2, 1, 9, options); this->AssertMinMaxIs(chunked_input3, 1, 9, options); - options = MinMaxOptions(MinMaxOptions::OUTPUT_NULL); + options = MinMaxOptions(MinMaxOptions::EMIT_NULL); this->AssertMinMaxIs("[5, 1, 2, 3, 4]", 1, 5, options); // output null this->AssertMinMaxIsNull("[5, null, 2, 3, 4]", options); @@ -570,7 +570,7 @@ TYPED_TEST(TestFloatingMinMaxKernel, Floats) { this->AssertMinMaxIs(chunked_input2, 1, 9, options); this->AssertMinMaxIs(chunked_input3, 1, 9, options); - options = MinMaxOptions(MinMaxOptions::OUTPUT_NULL); + options = MinMaxOptions(MinMaxOptions::EMIT_NULL); this->AssertMinMaxIs("[5, 1, 2, 3, 4]", 1, 5, options); this->AssertMinMaxIs("[5, -Inf, 2, 3, 4]", -INFINITY, 5, options); // output null diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst index 111384929cb..3a0abe39a38 100644 --- a/docs/source/cpp/compute.rst +++ b/docs/source/cpp/compute.rst @@ -83,7 +83,7 @@ Some functions accept or require an options structure that determines the exact semantics of the function:: MinMaxOptions options; - options.null_handling = MinMaxOptions::OUTPUT_NULL; + options.null_handling = MinMaxOptions::EMIT_NULL; std::shared_ptr array = ...; arrow::Datum min_max_datum; diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index 1b5535032f0..8026587fa48 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -26,18 +26,27 @@ import numpy as np cdef wrap_scalar_function(const shared_ptr[CFunction]& sp_func): + """ + Wrap a C++ scalar Function in a ScalarFunction object. + """ cdef ScalarFunction func = ScalarFunction.__new__(ScalarFunction) func.init(sp_func) return func cdef wrap_vector_function(const shared_ptr[CFunction]& sp_func): + """ + Wrap a C++ vector Function in a VectorFunction object. + """ cdef VectorFunction func = VectorFunction.__new__(VectorFunction) func.init(sp_func) return func cdef wrap_scalar_aggregate_function(const shared_ptr[CFunction]& sp_func): + """ + Wrap a C++ aggregate Function in a ScalarAggregateFunction object. + """ cdef ScalarAggregateFunction func = ( ScalarAggregateFunction.__new__(ScalarAggregateFunction) ) @@ -46,6 +55,9 @@ cdef wrap_scalar_aggregate_function(const shared_ptr[CFunction]& sp_func): cdef wrap_meta_function(const shared_ptr[CFunction]& sp_func): + """ + Wrap a C++ meta Function in a MetaFunction object. + """ cdef MetaFunction func = ( MetaFunction.__new__(MetaFunction) ) @@ -54,6 +66,11 @@ cdef wrap_meta_function(const shared_ptr[CFunction]& sp_func): cdef wrap_function(const shared_ptr[CFunction]& sp_func): + """ + Wrap a C++ Function in a Function object. + + This dispatches to specialized wrappers depending on the function kind. + """ if sp_func.get() == NULL: raise ValueError('Function was NULL') @@ -97,6 +114,11 @@ cdef wrap_scalar_aggregate_kernel(const CScalarAggregateKernel* c_kernel): cdef class Kernel(_Weakrefable): + """ + A kernel object. + + Kernels handle the execution of a Function for a certain signature. + """ def __init__(self): raise TypeError("Do not call {}'s constructor directly" @@ -140,6 +162,29 @@ cdef class ScalarAggregateKernel(Kernel): cdef class Function(_Weakrefable): + """ + A compute function. + + A function implements a certain logical computation over a range of + possible input signatures. Each signature accepts a range of input + types and is implemented by a given Kernel. + + Functions can be of different kinds: + + * "scalar" functions apply an item-wise computation over all items + of their inputs. Each item in the output only depends on the values + of the inputs at the same position. Examples: addition, comparisons, + string predicates... + + * "vector" functions apply a collection-wise computation, such that + each item in the output may depend on the values of several items + in each input. Examples: dictionary encoding, sorting, extracting + unique values... + + * "aggregate" functions reduce the dimensionality of the inputs by + applying a reduction function. Examples: sum, minmax, mode... + + """ cdef: shared_ptr[CFunction] sp_func CFunction* base_func @@ -153,13 +198,40 @@ cdef class Function(_Weakrefable): self.base_func = sp_func.get() def __repr__(self): - return """arrow.compute.Function -kind: {} -num_kernels: {} -""".format(self.kind, self.num_kernels) + return ("arrow.compute.Function" + ).format(self.name, self.kind, self.arity, self.num_kernels) + + def __reduce__(self): + # Reduction uses the global registry + return get_function, (self.name,) + + @property + def name(self): + """ + The function name. + """ + return frombytes(self.base_func.name()) + + @property + def arity(self): + """ + The function arity. + + If Ellipsis (i.e. `...`) is returned, the function takes a variable + number of arguments. + """ + cdef CArity arity = self.base_func.arity() + if arity.is_varargs: + return ... + else: + return arity.num_args @property def kind(self): + """ + The function kind. + """ cdef FunctionKind c_kind = self.base_func.kind() if c_kind == FunctionKind_SCALAR: return 'scalar' @@ -167,15 +239,23 @@ num_kernels: {} return 'vector' elif c_kind == FunctionKind_SCALAR_AGGREGATE: return 'scalar_aggregate' + elif c_kind == FunctionKind_META: + return 'meta' else: raise NotImplementedError("Unknown Function::Kind") @property def num_kernels(self): + """ + The number of kernels implementing this function. + """ return self.base_func.num_kernels() def call(self, args, FunctionOptions options=None, MemoryPool memory_pool=None): + """ + Call the function on the given arguments. + """ cdef: const CFunctionOptions* c_options = NULL CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) @@ -204,7 +284,11 @@ cdef class ScalarFunction(Function): Function.init(self, sp_func) self.func = sp_func.get() - def list_kernels(self): + @property + def kernels(self): + """ + The kernels implementing this function. + """ cdef vector[const CScalarKernel*] kernels = self.func.kernels() return [wrap_scalar_kernel(k) for k in kernels] @@ -217,7 +301,11 @@ cdef class VectorFunction(Function): Function.init(self, sp_func) self.func = sp_func.get() - def list_kernels(self): + @property + def kernels(self): + """ + The kernels implementing this function. + """ cdef vector[const CVectorKernel*] kernels = self.func.kernels() return [wrap_vector_kernel(k) for k in kernels] @@ -230,7 +318,11 @@ cdef class ScalarAggregateFunction(Function): Function.init(self, sp_func) self.func = sp_func.get() - def list_kernels(self): + @property + def kernels(self): + """ + The kernels implementing this function. + """ cdef vector[const CScalarAggregateKernel*] kernels = ( self.func.kernels() ) @@ -245,6 +337,15 @@ cdef class MetaFunction(Function): Function.init(self, sp_func) self.func = sp_func.get() + # Since num_kernels is exposed, also expose a kernels property + + @property + def kernels(self): + """ + The kernels implementing this function. + """ + return [] + cdef _pack_compute_args(object values, vector[CDatum]* out): for val in values: @@ -274,10 +375,16 @@ cdef class FunctionRegistry(_Weakrefable): self.registry = GetFunctionRegistry() def list_functions(self): + """ + Return all function names in the registry. + """ cdef vector[c_string] names = self.registry.GetFunctionNames() return [frombytes(name) for name in names] def get_function(self, name): + """ + Look up a function by name in the registry. + """ cdef: c_string c_name = tobytes(name) shared_ptr[CFunction] func @@ -293,7 +400,30 @@ def function_registry(): return _global_func_registry +def get_function(name): + """ + Get a function by name. + + The function is looked up in the global registry + (as returned by `function_registry()`). + """ + return _global_func_registry.get_function(name) + + +def list_functions(): + """ + Return all function names in the global registry. + """ + return _global_func_registry.list_functions() + + def call_function(name, args, options=None, memory_pool=None): + """ + Call a named function. + + The function is looked up in the global registry + (as returned by `function_registry()`). + """ func = _global_func_registry.get_function(name) return func.call(args, options=options, memory_pool=memory_pool) @@ -308,7 +438,7 @@ cdef class CastOptions(FunctionOptions): __slots__ = () # avoid mistakingly creating attributes - def __init__(self, DataType target_type=None, allow_int_overflow=None, + def __init__(self, DataType target_type=None, *, allow_int_overflow=None, allow_time_truncate=None, allow_time_overflow=None, allow_float_truncate=None, allow_invalid_utf8=None): if allow_int_overflow is not None: @@ -429,9 +559,8 @@ cdef class FilterOptions(FunctionOptions): ) else: raise ValueError( - '"{}" is not a valid null_selection_behavior'.format( - null_selection_behavior) - ) + '"{}" is not a valid null_selection_behavior' + .format(null_selection_behavior)) cdef const CFunctionOptions* get_options(self) except NULL: return &self.filter_options @@ -441,8 +570,26 @@ cdef class TakeOptions(FunctionOptions): cdef: CTakeOptions take_options - def __init__(self, boundscheck=True): + def __init__(self, *, boundscheck=True): self.take_options.boundscheck = boundscheck cdef const CFunctionOptions* get_options(self) except NULL: return &self.take_options + + +cdef class MinMaxOptions(FunctionOptions): + cdef: + CMinMaxOptions min_max_options + + def __init__(self, null_handling='skip'): + if null_handling == 'skip': + self.min_max_options.null_handling = CMinMaxMode_SKIP + elif null_handling == 'emit_null': + self.min_max_options.null_handling = CMinMaxMode_EMIT_NULL + else: + raise ValueError( + '"{}" is not a valid null_handling' + .format(null_handling)) + + cdef const CFunctionOptions* get_options(self) except NULL: + return &self.min_max_options diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py index c4c1e70d089..2204471b0ee 100644 --- a/python/pyarrow/compute.py +++ b/python/pyarrow/compute.py @@ -17,16 +17,164 @@ from pyarrow._compute import ( # noqa - FilterOptions, Function, FunctionRegistry, + Kernel, + ScalarAggregateFunction, + ScalarAggregateKernel, + ScalarFunction, + ScalarKernel, + VectorFunction, + VectorKernel, + # Option classes + CastOptions, + FilterOptions, + MatchSubstringOptions, + MinMaxOptions, + TakeOptions, + # Functions function_registry, call_function, - TakeOptions + get_function, + list_functions, ) +from textwrap import dedent + import pyarrow as pa -import pyarrow._compute as _pc + + +def _decorate_compute_function(wrapper, exposed_name, func, option_class): + wrapper.__arrow_compute_function__ = dict(name=func.name, + arity=func.arity) + wrapper.__name__ = exposed_name + wrapper.__qualname__ = exposed_name + + # TODO (ARROW-9164): expose actual docstring from C++ + doc_pieces = [] + arg_str = "arguments" if func.arity > 1 else "argument" + doc_pieces.append("""\ + Call compute function {!r} with the given {}. + + Parameters + ---------- + """.format(func.name, arg_str)) + + if func.arity == 1: + doc_pieces.append("""\ + arg : Array-like or scalar-like + Argument to compute function + """) + elif func.arity == 2: + doc_pieces.append("""\ + left : Array-like or scalar-like + First argument to compute function + right : Array-like or scalar-like + Second argument to compute function + """) + + doc_pieces.append("""\ + memory_pool : pyarrow.MemoryPool, optional + If not passed, will allocate memory from the default memory pool. + """) + if option_class is not None: + doc_pieces.append("""\ + options : pyarrow.compute.{0}, optional + Parameters altering compute function semantics + **kwargs: optional + Parameters for {0} constructor. Either `options` + or `**kwargs` can be passed, but not both at the same time. + """.format(option_class.__name__)) + + wrapper.__doc__ = "".join(dedent(s) for s in doc_pieces) + return wrapper + + +_option_classes = { + # TODO this is not complete + # (export the option class name from C++ metadata?) + 'cast': CastOptions, + 'filter': FilterOptions, + 'match_substring': MatchSubstringOptions, + 'min_max': MinMaxOptions, + 'take': TakeOptions, +} + + +def _handle_options(name, option_class, options, kwargs): + if kwargs: + if options is None: + return option_class(**kwargs) + raise TypeError( + "Function {!r} called with both an 'options' argument " + "and additional named arguments" + .format(name)) + + if options is not None: + if isinstance(options, dict): + return option_class(**options) + elif isinstance(options, option_class): + return options + raise TypeError( + "Function {!r} expected a {} parameter, got {}" + .format(name, option_class, type(options))) + + return options + + +def _simple_unary_function(name): + func = get_function(name) + option_class = _option_classes.get(name) + + if option_class is not None: + def wrapper(arg, *, options=None, memory_pool=None, **kwargs): + options = _handle_options(name, option_class, options, kwargs) + return func.call([arg], options, memory_pool) + else: + def wrapper(arg, *, memory_pool=None): + return func.call([arg], None, memory_pool) + + return _decorate_compute_function(wrapper, name, func, option_class) + + +def _simple_binary_function(name): + func = get_function(name) + option_class = _option_classes.get(name) + + if option_class is not None: + def wrapper(left, right, *, options=None, memory_pool=None, **kwargs): + options = _handle_options(name, option_class, options, kwargs) + return func.call([left, right], options, memory_pool) + else: + def wrapper(left, right, *, memory_pool=None): + return func.call([left, right], None, memory_pool) + + return _decorate_compute_function(wrapper, name, func, option_class) + + +def _make_global_functions(): + """ + Make global functions wrapping each compute function. + + Note that some of the automatically-generated wrappers may be overriden + by custom versions below. + """ + g = globals() + reg = function_registry() + + for name in reg.list_functions(): + assert name not in g, name + func = reg.get_function(name) + if func.arity == 1: + g[name] = _simple_unary_function(name) + elif func.arity == 2: + g[name] = _simple_binary_function(name) + else: + raise NotImplementedError("Unsupported function arity: ", + func.arity) + + +_make_global_functions() def cast(arr, target_type, safe=True): @@ -81,75 +229,12 @@ def cast(arr, target_type, safe=True): if target_type is None: raise ValueError("Cast target type must not be None") if safe: - options = _pc.CastOptions.safe(target_type) + options = CastOptions.safe(target_type) else: - options = _pc.CastOptions.unsafe(target_type) + options = CastOptions.unsafe(target_type) return call_function("cast", [arr], options) -def _decorate_compute_function(func, name, *, arity): - func.__arrow_compute_function__ = dict(name=name, arity=arity) - return func - - -def _simple_unary_function(name): - def func(arg): - return call_function(name, [arg]) - return _decorate_compute_function(func, name, arity=1) - - -def _simple_binary_function(name): - def func(left, right): - return call_function(name, [left, right]) - return _decorate_compute_function(func, name, arity=2) - - -binary_length = _simple_unary_function('binary_length') -ascii_upper = _simple_unary_function('ascii_upper') -ascii_lower = _simple_unary_function('ascii_lower') -utf8_upper = _simple_unary_function('utf8_upper') -utf8_lower = _simple_unary_function('utf8_lower') - -string_is_ascii = _simple_unary_function('string_is_ascii') - -ascii_is_alnum = _simple_unary_function('ascii_is_alnum') -utf8_is_alnum = _simple_unary_function('utf8_is_alnum') -ascii_is_alpha = _simple_unary_function('ascii_is_alpha') -utf8_is_alpha = _simple_unary_function('utf8_is_alpha') -ascii_is_decimal = _simple_unary_function('ascii_is_decimal') -utf8_is_decimal = _simple_unary_function('utf8_is_decimal') -ascii_is_digit = ascii_is_decimal # alias -utf8_is_digit = _simple_unary_function('utf8_is_digit') -ascii_is_lower = _simple_unary_function('ascii_is_lower') -utf8_is_lower = _simple_unary_function('utf8_is_lower') -ascii_is_numeric = ascii_is_decimal # alias -utf8_is_numeric = _simple_unary_function('utf8_is_numeric') -ascii_is_printable = _simple_unary_function('ascii_is_printable') -utf8_is_printable = _simple_unary_function('utf8_is_printable') -ascii_is_title = _simple_unary_function('ascii_is_title') -utf8_is_title = _simple_unary_function('utf8_is_title') -ascii_is_upper = _simple_unary_function('ascii_is_upper') -utf8_is_upper = _simple_unary_function('utf8_is_upper') - -is_valid = _simple_unary_function('is_valid') -is_null = _simple_unary_function('is_null') - -list_flatten = _simple_unary_function('list_flatten') -list_parent_indices = _simple_unary_function('list_parent_indices') -list_value_length = _simple_unary_function('list_value_length') - -add = _simple_binary_function('add') -subtract = _simple_binary_function('subtract') -multiply = _simple_binary_function('multiply') - -equal = _simple_binary_function('equal') -not_equal = _simple_binary_function('not_equal') -greater = _simple_binary_function('greater') -greater_equal = _simple_binary_function('greater_equal') -less = _simple_binary_function('less') -less_equal = _simple_binary_function('less_equal') - - def match_substring(array, pattern): """ Test if substring *pattern* is contained within a value of a string array. @@ -165,7 +250,7 @@ def match_substring(array, pattern): result : pyarrow.Array or pyarrow.ChunkedArray """ return call_function("match_substring", [array], - _pc.MatchSubstringOptions(pattern)) + MatchSubstringOptions(pattern)) def sum(array): @@ -253,7 +338,7 @@ def filter(data, mask, null_selection_behavior='drop'): return call_function('filter', [data, mask], options) -def take(data, indices, boundscheck=True): +def take(data, indices, *, boundscheck=True, memory_pool=None): """ Select values (or records) from array- or table-like data given integer selection indices. @@ -290,8 +375,8 @@ def take(data, indices, boundscheck=True): null ] """ - options = TakeOptions(boundscheck) - return call_function('take', [data, indices], options) + options = TakeOptions(boundscheck=boundscheck) + return call_function('take', [data, indices], options, memory_pool) def fill_null(values, fill_value): diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 52e32a356fe..9a3451e4f82 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1707,6 +1707,17 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: " arrow::compute::TakeOptions"(CFunctionOptions): c_bool boundscheck + enum CMinMaxMode \ + "arrow::compute::MinMaxOptions::Mode": + CMinMaxMode_SKIP \ + "arrow::compute::MinMaxOptions::SKIP" + CMinMaxMode_EMIT_NULL \ + "arrow::compute::MinMaxOptions::EMIT_NULL" + + cdef cppclass CMinMaxOptions \ + "arrow::compute::MinMaxOptions"(CFunctionOptions): + CMinMaxMode null_handling + enum DatumType" arrow::Datum::type": DatumType_NONE" arrow::Datum::NONE" DatumType_SCALAR" arrow::Datum::SCALAR" diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py index 129c7826759..c5c3a4f6a6c 100644 --- a/python/pyarrow/tests/test_compute.py +++ b/python/pyarrow/tests/test_compute.py @@ -16,8 +16,11 @@ # under the License. from functools import lru_cache -import numpy as np +import pickle import pytest +import textwrap + +import numpy as np import pyarrow as pa import pyarrow.compute as pc @@ -77,6 +80,82 @@ def test_exported_functions(): func(*args) +def test_list_functions(): + assert len(pc.list_functions()) > 10 + assert "add" in pc.list_functions() + + +def _check_get_function(name, expected_func_cls, expected_ker_cls, + min_num_kernels=1): + func = pc.get_function(name) + assert isinstance(func, expected_func_cls) + n = func.num_kernels + assert n >= min_num_kernels + assert n == len(func.kernels) + assert all(isinstance(ker, expected_ker_cls) for ker in func.kernels) + + +def test_get_function_scalar(): + _check_get_function("add", pc.ScalarFunction, pc.ScalarKernel, 8) + + +def test_get_function_vector(): + _check_get_function("unique", pc.VectorFunction, pc.VectorKernel, 8) + + +def test_get_function_aggregate(): + _check_get_function("mean", pc.ScalarAggregateFunction, + pc.ScalarAggregateKernel, 8) + + +def test_call_function_with_memory_pool(): + arr = pa.array(["foo", "bar", "baz"]) + indices = np.array([2, 2, 1]) + result1 = arr.take(indices) + result2 = pc.call_function('take', [arr, indices], + memory_pool=pa.default_memory_pool()) + expected = pa.array(["baz", "baz", "bar"]) + assert result1.equals(expected) + assert result2.equals(expected) + + result3 = pc.take(arr, indices, memory_pool=pa.default_memory_pool()) + assert result3.equals(expected) + + +def test_pickle_functions(): + # Pickle registered functions + for name in pc.list_functions(): + func = pc.get_function(name) + reconstructed = pickle.loads(pickle.dumps(func)) + assert type(reconstructed) is type(func) + assert reconstructed.name == func.name + assert reconstructed.arity == func.arity + assert reconstructed.num_kernels == func.num_kernels + + +def test_pickle_global_functions(): + # Pickle global wrappers (manual or automatic) of registered functions + for name in pc.list_functions(): + func = getattr(pc, name) + reconstructed = pickle.loads(pickle.dumps(func)) + assert reconstructed is func + + +def test_function_attributes(): + # Sanity check attributes of registered functions + for name in pc.list_functions(): + func = pc.get_function(name) + assert isinstance(func, pc.Function) + assert func.name == name + kernels = func.kernels + assert func.num_kernels == len(kernels) + assert all(isinstance(ker, pc.Kernel) for ker in kernels) + assert func.arity >= 1 # no varargs functions for now + repr(func) + for ker in kernels: + repr(ker) + + @pytest.mark.parametrize('arrow_type', numerical_arrow_types) def test_sum_array(arrow_type): arr = pa.array([1, 2, 3, 4], type=arrow_type) @@ -111,7 +190,6 @@ def test_sum_chunked_array(arrow_type): def test_mode_array(): # ARROW-9917 - arr = pa.array([1, 1, 3, 4, 3, 5], type='int64') expected = {"mode": 1, "count": 2} assert pc.mode(arr).as_py() == {"mode": 1, "count": 2} @@ -123,7 +201,6 @@ def test_mode_array(): def test_mode_chunked_array(): # ARROW-9917 - arr = pa.chunked_array([pa.array([1, 1, 3, 4, 3, 5], type='int64')]) expected = {"mode": 1, "count": 2} assert pc.mode(arr).as_py() == expected @@ -141,6 +218,75 @@ def test_match_substring(): assert expected.equals(result) +def test_min_max(): + # An example generated function wrapper with possible options + data = [4, 5, 6, None, 1] + s = pc.min_max(data) + assert s.as_py() == {'min': 1, 'max': 6} + s = pc.min_max(data, options=pc.MinMaxOptions()) + assert s.as_py() == {'min': 1, 'max': 6} + s = pc.min_max(data, options=pc.MinMaxOptions(null_handling='skip')) + assert s.as_py() == {'min': 1, 'max': 6} + s = pc.min_max(data, options=pc.MinMaxOptions(null_handling='emit_null')) + assert s.as_py() == {'min': None, 'max': None} + + # Options as dict of kwargs + s = pc.min_max(data, options={'null_handling': 'emit_null'}) + assert s.as_py() == {'min': None, 'max': None} + # Options as named functions arguments + s = pc.min_max(data, null_handling='emit_null') + assert s.as_py() == {'min': None, 'max': None} + + # Both options and named arguments + with pytest.raises(TypeError): + s = pc.min_max(data, options=pc.MinMaxOptions(), + null_handling='emit_null') + + # Wrong options type + options = pc.TakeOptions() + with pytest.raises(TypeError): + s = pc.min_max(data, options=options) + + +def test_is_valid(): + # An example generated function wrapper without options + data = [4, 5, None] + assert pc.is_valid(data).to_pylist() == [True, True, False] + + with pytest.raises(TypeError): + pc.is_valid(data, options=None) + + +def test_generated_docstrings(): + assert pc.min_max.__doc__ == textwrap.dedent("""\ + Call compute function 'min_max' with the given argument. + + Parameters + ---------- + arg : Array-like or scalar-like + Argument to compute function + memory_pool : pyarrow.MemoryPool, optional + If not passed, will allocate memory from the default memory pool. + options : pyarrow.compute.MinMaxOptions, optional + Parameters altering compute function semantics + **kwargs: optional + Parameters for MinMaxOptions constructor. Either `options` + or `**kwargs` can be passed, but not both at the same time. + """) + assert pc.add.__doc__ == textwrap.dedent("""\ + Call compute function 'add' with the given arguments. + + Parameters + ---------- + left : Array-like or scalar-like + First argument to compute function + right : Array-like or scalar-like + Second argument to compute function + memory_pool : pyarrow.MemoryPool, optional + If not passed, will allocate memory from the default memory pool. + """) + + # We use isprintable to find about codepoints that Python doesn't know, but # utf8proc does (or in a future version of Python the other way around). # These codepoints cannot be compared between Arrow and the Python @@ -259,11 +405,13 @@ def test_string_py_compat_boolean(function_name, variant): # the issues we know of, we skip if i in ignore: continue + # Compare results with the equivalent Python predicate + # (except "is_space" where functions are known to be incompatible) c = chr(i) - if hasattr(pc, arrow_name): + if hasattr(pc, arrow_name) and function_name != 'is_space': ar = pa.array([c]) - assert getattr(pc, arrow_name)( - ar)[0].as_py() == getattr(c, py_name)() + arrow_func = getattr(pc, arrow_name) + assert arrow_func(ar)[0].as_py() == getattr(c, py_name)() @pytest.mark.parametrize(('ty', 'values'), all_array_types) @@ -347,17 +495,6 @@ def test_take_on_chunked_array(): assert result.equals(expected) -def test_call_function_with_memory_pool(): - arr = pa.array(["foo", "bar", "baz"]) - indices = np.array([2, 2, 1]) - result1 = arr.take(indices) - result2 = pc.call_function('take', [arr, indices], - memory_pool=pa.default_memory_pool()) - expected = pa.array(["baz", "baz", "bar"]) - assert result1.equals(expected) - assert result2.equals(expected) - - @pytest.mark.parametrize('ordered', [False, True]) def test_take_dictionary(ordered): arr = pa.DictionaryArray.from_arrays([0, 1, 2, 0, 1, 2], ['a', 'b', 'c'], diff --git a/r/src/compute.cpp b/r/src/compute.cpp index 2fced04fcce..e2ae9a0fdc6 100644 --- a/r/src/compute.cpp +++ b/r/src/compute.cpp @@ -168,7 +168,7 @@ std::shared_ptr make_compute_options( using Options = arrow::compute::MinMaxOptions; auto out = std::make_shared(Options::Defaults()); out->null_handling = - cpp11::as_cpp(options["na.rm"]) ? Options::SKIP : Options::OUTPUT_NULL; + cpp11::as_cpp(options["na.rm"]) ? Options::SKIP : Options::EMIT_NULL; return out; } From 6e82d7b0046348780ca0077c00a35c8d940fa8df Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 10 Sep 2020 19:02:05 +0200 Subject: [PATCH 2/4] Add automatic scalar conversion --- .../compute/kernels/scalar_set_lookup.cc | 23 ++++++++++--------- python/pyarrow/_compute.pyx | 19 +++++++++++++-- python/pyarrow/tests/test_compute.py | 16 +++++++++++-- 3 files changed, 43 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/scalar_set_lookup.cc b/cpp/src/arrow/compute/kernels/scalar_set_lookup.cc index 726b01fb477..c5d3919db5b 100644 --- a/cpp/src/arrow/compute/kernels/scalar_set_lookup.cc +++ b/cpp/src/arrow/compute/kernels/scalar_set_lookup.cc @@ -444,17 +444,18 @@ void RegisterScalarSetLookup(FunctionRegistry* registry) { // IndexIn uses Int32Builder and so is responsible for all its own allocation { - ScalarKernel match_base; - match_base.init = InitSetLookup; - match_base.exec = ExecIndexIn; - match_base.null_handling = NullHandling::COMPUTED_NO_PREALLOCATE; - match_base.mem_allocation = MemAllocation::NO_PREALLOCATE; - auto match = std::make_shared("index_in", Arity::Unary()); - AddBasicSetLookupKernels(match_base, /*output_type=*/int32(), match.get()); - - match_base.signature = KernelSignature::Make({null()}, int32()); - DCHECK_OK(match->AddKernel(match_base)); - DCHECK_OK(registry->AddFunction(match)); + ScalarKernel index_in_base; + index_in_base.init = InitSetLookup; + index_in_base.exec = ExecIndexIn; + index_in_base.null_handling = NullHandling::COMPUTED_NO_PREALLOCATE; + index_in_base.mem_allocation = MemAllocation::NO_PREALLOCATE; + auto index_in = std::make_shared("index_in", Arity::Unary()); + + AddBasicSetLookupKernels(index_in_base, /*output_type=*/int32(), index_in.get()); + + index_in_base.signature = KernelSignature::Make({null()}, int32()); + DCHECK_OK(index_in->AddKernel(index_in_base)); + DCHECK_OK(registry->AddFunction(index_in)); DCHECK_OK(registry->AddFunction(std::make_shared())); } diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index 8026587fa48..b4bd9d2b0cc 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -354,17 +354,32 @@ cdef _pack_compute_args(object values, vector[CDatum]* out): if isinstance(val, Array): out.push_back(CDatum(( val).sp_array)) + continue elif isinstance(val, ChunkedArray): out.push_back(CDatum(( val).sp_chunked_array)) + continue elif isinstance(val, Scalar): out.push_back(CDatum(( val).unwrap())) + continue elif isinstance(val, RecordBatch): out.push_back(CDatum(( val).sp_batch)) + continue elif isinstance(val, Table): out.push_back(CDatum(( val).sp_table)) + continue else: - raise TypeError("Got unexpected argument type {} " - "for compute function".format(type(val))) + # Is it a Python scalar? + try: + scal = lib.scalar(val) + except Exception: + # Raise dedicated error below + pass + else: + out.push_back(CDatum(( scal).unwrap())) + continue + + raise TypeError("Got unexpected argument type {} " + "for compute function".format(type(val))) cdef class FunctionRegistry(_Weakrefable): diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py index c5c3a4f6a6c..3c2e0865f8d 100644 --- a/python/pyarrow/tests/test_compute.py +++ b/python/pyarrow/tests/test_compute.py @@ -73,10 +73,10 @@ def test_exported_functions(): functions = exported_functions assert len(functions) >= 10 for func in functions: - args = [None] * func.__arrow_compute_function__['arity'] + args = [object()] * func.__arrow_compute_function__['arity'] with pytest.raises(TypeError, match="Got unexpected argument type " - " for compute function"): + " for compute function"): func(*args) @@ -156,6 +156,18 @@ def test_function_attributes(): repr(ker) +def test_input_type_conversion(): + # Automatic array conversion from Python + arr = pc.add([1, 2], [4, None]) + assert arr.to_pylist() == [5, None] + # Automatic scalar conversion from Python + arr = pc.add([1, 2], 4) + assert arr.to_pylist() == [5, 6] + # Other scalar type + assert pc.equal(["foo", "bar", None], + "foo").to_pylist() == [True, False, None] + + @pytest.mark.parametrize('arrow_type', numerical_arrow_types) def test_sum_array(arrow_type): arr = pa.array([1, 2, 3, 4], type=arrow_type) From 648ee8566fb6317385fe9b41e42b97269e030f7d Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 14 Sep 2020 15:35:30 +0200 Subject: [PATCH 3/4] Address review comment --- python/pyarrow/_compute.pyx | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index b4bd9d2b0cc..6fbe1581f6a 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -184,6 +184,7 @@ cdef class Function(_Weakrefable): * "aggregate" functions reduce the dimensionality of the inputs by applying a reduction function. Examples: sum, minmax, mode... + * "meta" functions dispatch to other functions. """ cdef: shared_ptr[CFunction] sp_func From b4a67185cf333ffcb33ffbda5cf08ecaa50b294f Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 14 Sep 2020 16:58:19 +0200 Subject: [PATCH 4/4] Fix ASAN/UBSAN job --- cpp/src/arrow/compute/function.cc | 3 +++ cpp/src/arrow/compute/kernels/scalar_set_lookup.cc | 8 ++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 6205274cc96..91da1ae76ff 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -192,6 +192,9 @@ Result MetaFunction::Execute(const std::vector& args, const FunctionOptions* options, ExecContext* ctx) const { RETURN_NOT_OK(CheckArity(static_cast(args.size()))); + if (options == nullptr) { + options = default_options(); + } return ExecuteImpl(args, options, ctx); } diff --git a/cpp/src/arrow/compute/kernels/scalar_set_lookup.cc b/cpp/src/arrow/compute/kernels/scalar_set_lookup.cc index c5d3919db5b..4954d23b05c 100644 --- a/cpp/src/arrow/compute/kernels/scalar_set_lookup.cc +++ b/cpp/src/arrow/compute/kernels/scalar_set_lookup.cc @@ -404,7 +404,9 @@ class IsInMetaBinary : public MetaFunction { Result ExecuteImpl(const std::vector& args, const FunctionOptions* options, ExecContext* ctx) const override { - DCHECK_EQ(options, nullptr); + if (options != nullptr) { + return Status::Invalid("Unexpected options for 'is_in_meta_binary' function"); + } return IsIn(args[0], args[1], ctx); } }; @@ -417,7 +419,9 @@ class IndexInMetaBinary : public MetaFunction { Result ExecuteImpl(const std::vector& args, const FunctionOptions* options, ExecContext* ctx) const override { - DCHECK_EQ(options, nullptr); + if (options != nullptr) { + return Status::Invalid("Unexpected options for 'index_in_meta_binary' function"); + } return IndexIn(args[0], args[1], ctx); } };