From 6749a77f60138cb44fb0de1c913a19afc4abdf71 Mon Sep 17 00:00:00 2001 From: Vibhatha Lakmal Abeykoon Date: Mon, 3 Oct 2022 22:38:15 +0530 Subject: [PATCH 1/5] feat(initial): wip --- python/pyarrow/_compute.pxd | 6 +++++ python/pyarrow/_compute.pyx | 33 ++++++++++++++++++++++---- python/pyarrow/compute.py | 1 + python/pyarrow/includes/libarrow.pxd | 6 +++++ python/pyarrow/src/arrow/python/udf.cc | 3 +-- python/pyarrow/src/arrow/python/udf.h | 1 + python/pyarrow/tests/test_udf.py | 27 +++++++++++++++++++++ 7 files changed, 70 insertions(+), 7 deletions(-) diff --git a/python/pyarrow/_compute.pxd b/python/pyarrow/_compute.pxd index 8b09cbd445e..c5d49c467c7 100644 --- a/python/pyarrow/_compute.pxd +++ b/python/pyarrow/_compute.pxd @@ -27,6 +27,12 @@ cdef class ScalarUdfContext(_Weakrefable): cdef void init(self, const CScalarUdfContext& c_context) +cdef class FunctionRegistry(_Weakrefable): + cdef: + CFunctionRegistry* registry + + cdef void init(self, const unique_ptr[CFunctionRegistry]& registry) + cdef class FunctionOptions(_Weakrefable): cdef: shared_ptr[CFunctionOptions] wrapped diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index 2aa65e75c50..dc0035eff57 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -477,11 +477,13 @@ cdef _pack_compute_args(object values, vector[CDatum]* out): cdef class FunctionRegistry(_Weakrefable): - cdef CFunctionRegistry* registry def __init__(self): self.registry = GetFunctionRegistry() + cdef void init(self, const unique_ptr[CFunctionRegistry]& registry): + self.registry = registry.get() + def list_functions(self): """ Return all function names in the registry. @@ -505,13 +507,26 @@ cdef class FunctionRegistry(_Weakrefable): func = GetResultValue(self.registry.GetFunction(c_name)) return wrap_function(func) - cdef FunctionRegistry _global_func_registry = FunctionRegistry() +cdef object box_registry(const unique_ptr[CFunctionRegistry]& c_registry): + cdef FunctionRegistry registry = FunctionRegistry.__new__(FunctionRegistry) + registry.init(c_registry) + return registry -def function_registry(): - return _global_func_registry +def make_registry(parent): + cdef: + FunctionRegistry default + unique_ptr[CFunctionRegistry] up_registry + if parent is None: + raise ValueError("Parent registry not provided") + else: + default = (parent) + up_registry = CFunctionRegistry.Make(default.registry) + return box_registry(up_registry) +def function_registry(): + return _global_func_registry def get_function(name): """ @@ -2515,7 +2530,7 @@ def _get_scalar_udf_context(memory_pool, batch_length): def register_scalar_function(func, function_name, function_doc, in_types, - out_type): + out_type, registry=function_registry()): """ Register a user-defined scalar function. @@ -2593,6 +2608,8 @@ def register_scalar_function(func, function_name, function_doc, in_types, PyObject* c_function shared_ptr[CDataType] c_out_type CScalarUdfOptions c_options + CFunctionRegistry* c_registry + FunctionRegistry func_registry if callable(func): c_function = func @@ -2628,11 +2645,17 @@ def register_scalar_function(func, function_name, function_doc, in_types, c_out_type = pyarrow_unwrap_data_type(ensure_type(out_type)) + func_registry = (registry) + c_options.func_name = c_func_name c_options.arity = c_arity c_options.func_doc = c_func_doc c_options.input_types = c_in_types c_options.output_type = c_out_type + c_options.registry = func_registry.registry + + print(">>reg1") check_status(RegisterScalarFunction(c_function, &_scalar_udf_callback, c_options)) + diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py index 5873571c5a0..169488515ab 100644 --- a/python/pyarrow/compute.py +++ b/python/pyarrow/compute.py @@ -75,6 +75,7 @@ # Functions call_function, function_registry, + make_registry, get_function, list_functions, _group_by, diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index e44fa2615e2..1182a7f7d75 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1955,6 +1955,10 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: const c_string& name) const vector[c_string] GetFunctionNames() const int num_functions() const + + @staticmethod + unique_ptr[CFunctionRegistry] Make(CFunctionRegistry* parent) + CFunctionRegistry* GetFunctionRegistry() @@ -2762,6 +2766,7 @@ cdef extern from "arrow/python/udf.h" namespace "arrow::py": cdef cppclass CScalarUdfContext" arrow::py::ScalarUdfContext": CMemoryPool *pool int64_t batch_length + CFunctionRegistry *registry cdef cppclass CScalarUdfOptions" arrow::py::ScalarUdfOptions": c_string func_name @@ -2769,6 +2774,7 @@ cdef extern from "arrow/python/udf.h" namespace "arrow::py": CFunctionDoc func_doc vector[shared_ptr[CDataType]] input_types shared_ptr[CDataType] output_type + CFunctionRegistry* registry CStatus RegisterScalarFunction(PyObject* function, function[CallbackUdf] wrapper, const CScalarUdfOptions& options) diff --git a/python/pyarrow/src/arrow/python/udf.cc b/python/pyarrow/src/arrow/python/udf.cc index 81bf47c0ade..fe50133713c 100644 --- a/python/pyarrow/src/arrow/python/udf.cc +++ b/python/pyarrow/src/arrow/python/udf.cc @@ -115,8 +115,7 @@ Status RegisterScalarFunction(PyObject* user_function, ScalarUdfWrapperCallback kernel.mem_allocation = compute::MemAllocation::NO_PREALLOCATE; kernel.null_handling = compute::NullHandling::COMPUTED_NO_PREALLOCATE; RETURN_NOT_OK(scalar_func->AddKernel(std::move(kernel))); - auto registry = compute::GetFunctionRegistry(); - RETURN_NOT_OK(registry->AddFunction(std::move(scalar_func))); + RETURN_NOT_OK(options.registry->AddFunction(std::move(scalar_func))); return Status::OK(); } diff --git a/python/pyarrow/src/arrow/python/udf.h b/python/pyarrow/src/arrow/python/udf.h index 9a3666459fd..e3784baf632 100644 --- a/python/pyarrow/src/arrow/python/udf.h +++ b/python/pyarrow/src/arrow/python/udf.h @@ -39,6 +39,7 @@ struct ARROW_PYTHON_EXPORT ScalarUdfOptions { compute::FunctionDoc func_doc; std::vector> input_types; std::shared_ptr output_type; + compute::FunctionRegistry* registry; }; /// \brief A context passed as the first argument of scalar UDF functions. diff --git a/python/pyarrow/tests/test_udf.py b/python/pyarrow/tests/test_udf.py index e711619582d..fa597f9308b 100644 --- a/python/pyarrow/tests/test_udf.py +++ b/python/pyarrow/tests/test_udf.py @@ -504,3 +504,30 @@ def test_input_lifetime(unary_func_fixture): # Calling a UDF should not have kept `v` alive longer than required v = None assert proxy_pool.bytes_allocated() == 0 + +def test_function_registry(): + def f1(ctx, x): + return pc.call_function("add", [x, 1], + memory_pool=ctx.memory_pool) + func_name = "f1" + unary_doc = {"summary": "add function", + "description": "test add function"} + default_registry = pc.function_registry() + registry1 = pc.make_registry(default_registry) + + print(type(registry1)) + print(registry1.get_function("add")) + # pc.register_scalar_function(f1, + # func_name, + # unary_doc, + # {"array": pa.int64()}, + # pa.int64(), + # registry1) + + # assert pc.get_function(func_name).name == func_name + + # parent_registry = None + + # print(pc.get_function(func_name)) + + \ No newline at end of file From ff2cc1fa3b1b5f42076986e9aecdc8620589455f Mon Sep 17 00:00:00 2001 From: Vibhatha Lakmal Abeykoon Date: Tue, 4 Oct 2022 16:39:06 +0530 Subject: [PATCH 2/5] feat(initial): adding intial functionality for multi-registry usage for udfs --- python/pyarrow/_compute.pxd | 2 - python/pyarrow/_compute.pyx | 45 ++++++++++++---------- python/pyarrow/includes/libarrow.pxd | 3 +- python/pyarrow/tests/test_udf.py | 56 ++++++++++++++++++---------- 4 files changed, 64 insertions(+), 42 deletions(-) diff --git a/python/pyarrow/_compute.pxd b/python/pyarrow/_compute.pxd index c5d49c467c7..545c730831a 100644 --- a/python/pyarrow/_compute.pxd +++ b/python/pyarrow/_compute.pxd @@ -30,8 +30,6 @@ cdef class ScalarUdfContext(_Weakrefable): cdef class FunctionRegistry(_Weakrefable): cdef: CFunctionRegistry* registry - - cdef void init(self, const unique_ptr[CFunctionRegistry]& registry) cdef class FunctionOptions(_Weakrefable): cdef: diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index dc0035eff57..7098284d5a2 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -478,11 +478,19 @@ cdef _pack_compute_args(object values, vector[CDatum]* out): cdef class FunctionRegistry(_Weakrefable): - def __init__(self): - self.registry = GetFunctionRegistry() + def __init__(self, parent=None): + cdef: + FunctionRegistry new_rg + FunctionRegistry parent_rg + if parent is None: + self.registry = GetFunctionRegistry() + else: + parent_rg = (parent) + self.registry = CFunctionRegistry.Make( + parent_rg.registry).release() - cdef void init(self, const unique_ptr[CFunctionRegistry]& registry): - self.registry = registry.get() + def __dealloc__(self): + self.clear() def list_functions(self): """ @@ -507,26 +515,28 @@ cdef class FunctionRegistry(_Weakrefable): func = GetResultValue(self.registry.GetFunction(c_name)) return wrap_function(func) + def clear(self): + del self.registry + cdef FunctionRegistry _global_func_registry = FunctionRegistry() -cdef object box_registry(const unique_ptr[CFunctionRegistry]& c_registry): - cdef FunctionRegistry registry = FunctionRegistry.__new__(FunctionRegistry) - registry.init(c_registry) - return registry + +def function_registry(): + return _global_func_registry + def make_registry(parent): cdef: - FunctionRegistry default - unique_ptr[CFunctionRegistry] up_registry + FunctionRegistry new_rg + FunctionRegistry parent_rg if parent is None: - raise ValueError("Parent registry not provided") + return function_registry() else: - default = (parent) - up_registry = CFunctionRegistry.Make(default.registry) - return box_registry(up_registry) + parent_rg = (parent) + new_rg = FunctionRegistry.__new__(FunctionRegistry) + new_rg.registry = CFunctionRegistry.Make(parent_rg.registry).release() + return new_rg -def function_registry(): - return _global_func_registry def get_function(name): """ @@ -2654,8 +2664,5 @@ def register_scalar_function(func, function_name, function_doc, in_types, c_options.output_type = c_out_type c_options.registry = func_registry.registry - print(">>reg1") - check_status(RegisterScalarFunction(c_function, &_scalar_udf_callback, c_options)) - diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 1182a7f7d75..79331083e70 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1955,11 +1955,10 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: const c_string& name) const vector[c_string] GetFunctionNames() const int num_functions() const - + @staticmethod unique_ptr[CFunctionRegistry] Make(CFunctionRegistry* parent) - CFunctionRegistry* GetFunctionRegistry() cdef cppclass CElementWiseAggregateOptions \ diff --git a/python/pyarrow/tests/test_udf.py b/python/pyarrow/tests/test_udf.py index fa597f9308b..0d9db8cf689 100644 --- a/python/pyarrow/tests/test_udf.py +++ b/python/pyarrow/tests/test_udf.py @@ -505,29 +505,47 @@ def test_input_lifetime(unary_func_fixture): v = None assert proxy_pool.bytes_allocated() == 0 -def test_function_registry(): + +def test_nested_function_registry(): def f1(ctx, x): return pc.call_function("add", [x, 1], memory_pool=ctx.memory_pool) func_name = "f1" unary_doc = {"summary": "add function", "description": "test add function"} + default_registry = pc.function_registry() - registry1 = pc.make_registry(default_registry) - - print(type(registry1)) - print(registry1.get_function("add")) - # pc.register_scalar_function(f1, - # func_name, - # unary_doc, - # {"array": pa.int64()}, - # pa.int64(), - # registry1) - - # assert pc.get_function(func_name).name == func_name - - # parent_registry = None - - # print(pc.get_function(func_name)) - - \ No newline at end of file + + registry1 = pc.FunctionRegistry(default_registry) + + registry2 = pc.FunctionRegistry(registry1) + + pc.register_scalar_function(f1, + func_name, + unary_doc, + {"array": pa.int64()}, + pa.int64(), + registry2) + + assert registry2.get_function(func_name).name == func_name + + error_msg = "No function registered with name: f1" + with pytest.raises(pa.lib.ArrowKeyError, match=error_msg): + registry1.get_function(func_name) + + pc.register_scalar_function(f1, + func_name, + unary_doc, + {"array": pa.int64()}, + pa.int64(), + registry1) + assert registry1.get_function(func_name).name == func_name + + pc.register_scalar_function(f1, + func_name, + unary_doc, + {"array": pa.int64()}, + pa.int64(), + default_registry) + + assert default_registry.get_function(func_name).name == func_name From ef6f57084234887e58f7c66cb1e039e4f9b66133 Mon Sep 17 00:00:00 2001 From: Vibhatha Lakmal Abeykoon Date: Tue, 4 Oct 2022 16:49:06 +0530 Subject: [PATCH 3/5] fix(docs): added func registry docs and cleanup --- python/pyarrow/_compute.pyx | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index 7098284d5a2..0d17c1d1226 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -525,19 +525,6 @@ def function_registry(): return _global_func_registry -def make_registry(parent): - cdef: - FunctionRegistry new_rg - FunctionRegistry parent_rg - if parent is None: - return function_registry() - else: - parent_rg = (parent) - new_rg = FunctionRegistry.__new__(FunctionRegistry) - new_rg.registry = CFunctionRegistry.Make(parent_rg.registry).release() - return new_rg - - def get_function(name): """ Get a function by name. @@ -2581,6 +2568,12 @@ def register_scalar_function(func, function_name, function_doc, in_types, arity. out_type : DataType Output type of the function. + registry: FunctionRegistry + FunctionRegistry with which the function will be registered. + The default value is set to the global function registry. + This is an optional feature to allow grouping functions into + different registeries to enable removing functions if they + are not intended to be used further. Examples -------- From e21095a7038be0dded2e77f01a9f0f28b6a390ad Mon Sep 17 00:00:00 2001 From: Vibhatha Lakmal Abeykoon Date: Wed, 5 Oct 2022 11:37:47 +0530 Subject: [PATCH 4/5] fix(typo): removed function cleanup --- python/pyarrow/compute.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py index 169488515ab..5873571c5a0 100644 --- a/python/pyarrow/compute.py +++ b/python/pyarrow/compute.py @@ -75,7 +75,6 @@ # Functions call_function, function_registry, - make_registry, get_function, list_functions, _group_by, From 198ec825c54ca5385b2a1d77651f49bbd1a16f8e Mon Sep 17 00:00:00 2001 From: Vibhatha Lakmal Abeykoon Date: Tue, 11 Oct 2022 15:56:36 +0530 Subject: [PATCH 5/5] feat(multi-process): using registry in two separate processes --- python/pyarrow/includes/libarrow.pxd | 1 + python/pyarrow/tests/test_udf.py | 63 ++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 79331083e70..1d027a3451b 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1853,6 +1853,7 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: CExecContext() CExecContext(CMemoryPool* pool) CExecContext(CMemoryPool* pool, CExecutor* exc) + CExecContext(CMemoryPool* pool, CExecutor* exc, CFunctionRegistry* rgr) CMemoryPool* memory_pool() const CExecutor* executor() diff --git a/python/pyarrow/tests/test_udf.py b/python/pyarrow/tests/test_udf.py index 0d9db8cf689..66aae8529eb 100644 --- a/python/pyarrow/tests/test_udf.py +++ b/python/pyarrow/tests/test_udf.py @@ -17,6 +17,7 @@ import pytest +import multiprocessing as mp import pyarrow as pa from pyarrow import compute as pc @@ -549,3 +550,65 @@ def f1(ctx, x): default_registry) assert default_registry.get_function(func_name).name == func_name + + +def parallel_task1(data, q): + def f1(ctx, x): + return pc.call_function("add", [x, 10], + memory_pool=ctx.memory_pool) + func_name = "f1" + unary_doc = {"summary": "add function", + "description": "test add function"} + + default_registry = pc.function_registry() + + registry1 = pc.FunctionRegistry(default_registry) + pc.register_scalar_function(f1, + func_name, + unary_doc, + {"array": pa.int64()}, + pa.int64(), + registry1) + func = registry1.get_function(func_name) + result = func.call(data) + q.put(result) + + +def parallel_task2(data, q): + def f1(ctx, x): + return pc.call_function("multiply", [x, 10], + memory_pool=ctx.memory_pool) + func_name = "f1" + unary_doc = {"summary": "multiply function", + "description": "test multiply function"} + + default_registry = pc.function_registry() + + registry1 = pc.FunctionRegistry(default_registry) + pc.register_scalar_function(f1, + func_name, + unary_doc, + {"array": pa.int64()}, + pa.int64(), + registry1) + func = registry1.get_function(func_name) + q.put(func.call(data)) + + +def test_udf_usage_by_scope(): + # With support to custom function registration on global and + # nested function registries, user has the ability to make an + # scope for a registry and get some tasks done in a particular + # process and drop the registry once the process terminates. + ctx = mp.get_context('spawn') + q = ctx.Queue() + p1 = ctx.Process(target=parallel_task1, args=( + [pa.array([10, 20, 30], pa.int64())], q)) + p1.start() + result = q.get() + p1.join() + p2 = ctx.Process(target=parallel_task2, args=([result], q)) + p2.start() + final_result = q.get() + p2.join() + assert final_result == pa.array([200, 300, 400], pa.int64())