From 949de9d27bb54bd03023ba995564881fd5f89577 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 29 Jun 2021 10:54:54 -0400 Subject: [PATCH 1/9] Sketch of a general purpose cython trampolining utility --- cpp/src/arrow/python/common.h | 56 ++++++++++++++++++++ python/pyarrow/_dataset.pyx | 40 +++++++++++++- python/pyarrow/dataset.py | 6 ++- python/pyarrow/includes/common.pxd | 1 + python/pyarrow/includes/libarrow_dataset.pxd | 8 +++ python/pyarrow/includes/libarrow_fs.pxd | 4 ++ python/pyarrow/tests/test_dataset.py | 8 ++- 7 files changed, 119 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/python/common.h b/cpp/src/arrow/python/common.h index 8560fa2d6f4..71110cf084c 100644 --- a/cpp/src/arrow/python/common.h +++ b/cpp/src/arrow/python/common.h @@ -185,6 +185,62 @@ class ARROW_PYTHON_EXPORT OwnedRefNoGIL : public OwnedRef { } }; +template +struct BoundFunction; + +template +struct BoundFunction { + // We bind `cdef void fn(object, ...)` to get a `Status(...)` + // where the Status contains any Python error raised by `fn` + using Unbound = void(PyObject*, Args...); + using Bound = Status(Args...); + + BoundFunction(Unbound* unbound, PyObject* bound_arg) + : bound_arg_(bound_arg), unbound_(unbound) {} + + Status Invoke(Args... args) const { + unbound_(bound_arg_.obj(), std::forward(args)...); + return CheckPyError(); + } + + Unbound* unbound_; + OwnedRefNoGIL bound_arg_; +}; + +template +struct BoundFunction { + // We bind `cdef Return fn(object, ...)` to get a `Result(...)` + // where the Result contains any Python error raised by `fn` or the + // return value from `fn`. + using Unbound = Return(PyObject*, Args...); + using Bound = Result(Args...); + + BoundFunction(Unbound* unbound, PyObject* bound_arg) + : bound_arg_(bound_arg), unbound_(unbound) {} + + Result Invoke(Args... args) const { + Return ret = unbound_(bound_arg_.obj(), std::forward(args)...); + RETURN_NOT_OK(CheckPyError()); + return ret; + } + + Unbound* unbound_; + OwnedRefNoGIL bound_arg_; +}; + +template +std::function BindFunction(Return (*unbound)(PyObject*, Args...), + PyObject* bound_arg) { + using Fn = BoundFunction; + + static_assert(std::is_same::value, + "requested bound function of unsupported type"); + + Py_XINCREF(bound_arg); + auto bound_fn = std::make_shared(unbound, bound_arg); + return [bound_fn](Args... args) { return bound_fn->Invoke(args...); }; +} + // A temporary conversion of a Python object to a bytes area. struct PyBytesView { const char* bytes; diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index bd93da9cb18..06211c8e133 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -2994,7 +2994,7 @@ def _get_partition_keys(Expression partition_expression): For example, an expression of - is converted to {'part': 'a', 'year': 2016} + is converted to {'part': 'A', 'year': 2016} """ cdef: CExpression expr = partition_expression.unwrap() @@ -3009,6 +3009,40 @@ def _get_partition_keys(Expression partition_expression): return out +cdef class FragmentWriter(_Weakrefable): + cdef: + CFileWriter* writer + + def __init__(self): + _forbid_instantiation(self.__class__) + + cdef void init(self, CFileWriter* writer): + self.writer = writer + + @staticmethod + cdef wrap(CFileWriter* writer): + cdef FragmentWriter self = FragmentWriter.__new__(FragmentWriter) + self.init(writer) + return self + + @property + def path(self): + cdef bytes path = deref(self.writer).destination().path + return path.decode('utf-8') + + @property + def filesystem(self): + cdef: + shared_ptr[CFileSystem] c_filesystem + c_filesystem = deref(self.writer).destination().filesystem + return FileSystem.wrap(c_filesystem) + + @staticmethod + cdef void _wrap_visitor(PyObject* raw_visitor, CFileWriter* writer) except *: + visitor = PyObject_to_object(raw_visitor) + visitor(FragmentWriter.wrap(writer)) + + def _filesystemdataset_write( Scanner data not None, object base_dir not None, @@ -3017,6 +3051,7 @@ def _filesystemdataset_write( Partitioning partitioning not None, FileWriteOptions file_options not None, int max_partitions, + writer_pre_finish=None, ): """ CFileSystemDataset.Write wrapper @@ -3032,6 +3067,9 @@ def _filesystemdataset_write( c_options.partitioning = partitioning.unwrap() c_options.max_partitions = max_partitions c_options.basename_template = tobytes(basename_template) + if writer_pre_finish is not None: + c_options.writer_pre_finish = BindFunction[CFileWriterVisitor]( + &FragmentWriter._wrap_visitor, writer_pre_finish) c_scanner = data.unwrap() with nogil: diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index b93f492dd38..b13f4461c9b 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -34,6 +34,7 @@ FileSystemFactoryOptions, FileWriteOptions, Fragment, + FragmentWriter, HivePartitioning, IpcFileFormat, IpcFileWriteOptions, @@ -690,7 +691,8 @@ def _ensure_write_partitioning(scheme): def write_dataset(data, base_dir, basename_template=None, format=None, partitioning=None, schema=None, filesystem=None, file_options=None, use_threads=True, - use_async=False, max_partitions=None): + use_async=False, max_partitions=None, + writer_pre_finish=None): """ Write a dataset to a given format and partitioning. @@ -784,5 +786,5 @@ def write_dataset(data, base_dir, basename_template=None, format=None, _filesystemdataset_write( scanner, base_dir, basename_template, filesystem, partitioning, - file_options, max_partitions + file_options, max_partitions, writer_pre_finish ) diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd index 3f67a3256cc..902eaafbbbd 100644 --- a/python/pyarrow/includes/common.pxd +++ b/python/pyarrow/includes/common.pxd @@ -128,6 +128,7 @@ cdef extern from "arrow/result.h" namespace "arrow" nogil: cdef extern from "arrow/python/common.h" namespace "arrow::py" nogil: T GetResultValue[T](CResult[T]) except * + cdef function[F] BindFunction[F](void* unbound, object bound, ...) cdef inline object PyObject_to_object(PyObject* o): diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 8cab5536647..970d455e43c 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -235,6 +235,13 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: vector[int] row_group_ids) CStatus EnsureCompleteMetadata() + cdef cppclass CFileWriter "arrow::dataset::FileWriter": + const CFileLocator& destination() const + +ctypedef CStatus CFileWriterVisitor(CFileWriter*) + +cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: + cdef cppclass CFileSystemDatasetWriteOptions \ "arrow::dataset::FileSystemDatasetWriteOptions": shared_ptr[CFileWriteOptions] file_write_options @@ -243,6 +250,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CPartitioning] partitioning int max_partitions c_string basename_template + function[CFileWriterVisitor] writer_pre_finish cdef cppclass CFileSystemDataset \ "arrow::dataset::FileSystemDataset"(CDataset): diff --git a/python/pyarrow/includes/libarrow_fs.pxd b/python/pyarrow/includes/libarrow_fs.pxd index 52ef97e5757..eef3757bff0 100644 --- a/python/pyarrow/includes/libarrow_fs.pxd +++ b/python/pyarrow/includes/libarrow_fs.pxd @@ -52,6 +52,10 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil: c_bool allow_not_found c_bool recursive + cdef cppclass CFileLocator "arrow::fs::FileLocator": + shared_ptr[CFileSystem] filesystem + c_string path + cdef cppclass CFileSystem "arrow::fs::FileSystem": shared_ptr[CFileSystem] shared_from_this() c_string type_name() const diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 5e83657ebf2..65426d5de68 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3095,10 +3095,16 @@ def test_write_dataset_use_threads(tempdir): pa.schema([("part", pa.string())]), flavor="hive") target1 = tempdir / 'partitioned1' + paths_written = [] + def writer_pre_finish(writer): + pass + #paths_written.append(writer.path) ds.write_dataset( dataset, target1, format="feather", partitioning=partitioning, - use_threads=True + use_threads=True, writer_pre_finish=writer_pre_finish ) + assert paths_written == [] + target2 = tempdir / 'partitioned2' ds.write_dataset( dataset, target2, format="feather", partitioning=partitioning, From 518dea77729384e659176b1509b404d0a8e1bf14 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 29 Jun 2021 11:37:00 -0400 Subject: [PATCH 2/9] add tiny test case for BindFunction --- dev/archery/archery/cli.py | 1 - python/pyarrow/_dataset.pyx | 45 ++++++++++++++++++++++++++++ python/pyarrow/tests/test_dataset.py | 10 +++++++ 3 files changed, 55 insertions(+), 1 deletion(-) diff --git a/dev/archery/archery/cli.py b/dev/archery/archery/cli.py index fefd7b02ed4..7fef9edb4b9 100644 --- a/dev/archery/archery/cli.py +++ b/dev/archery/archery/cli.py @@ -115,7 +115,6 @@ def _apply_options(cmd, options): help="Specify Arrow source directory") # toolchain @cpp_toolchain_options -@java_toolchain_options @click.option("--build-type", default=None, type=build_type, help="CMake's CMAKE_BUILD_TYPE") @click.option("--warn-level", default="production", type=warn_level_type, diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 06211c8e133..9b9ea4d27ad 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -3074,3 +3074,48 @@ def _filesystemdataset_write( c_scanner = data.unwrap() with nogil: check_status(CFileSystemDataset.Write(c_options, c_scanner)) + + +# basic test to roundtrip through a BoundFunction + +ctypedef CStatus visit_string_cb(const c_string&) + +cdef extern from * namespace "arrow::py" nogil: + """ + #include + #include + #include + + #include "arrow/status.h" + + namespace arrow { + namespace py { + + Status VisitStrings(const std::vector& strs, + std::function cb) { + for (const std::string& str : strs) { + RETURN_NOT_OK(cb(str)); + } + return Status::OK(); + } + + } // namespace py + } // namespace arrow + """ + cdef CStatus CVisitStrings" arrow::py::VisitStrings"( + vector[c_string], function[visit_string_cb]) + + +cdef void _visit_strings_impl(py_cb, const c_string& s) except *: + py_cb(frombytes(s)) + +def _visit_strings(strings, cb): + cdef: + function[visit_string_cb] c_cb + vector[c_string] c_strings + + c_cb = BindFunction[visit_string_cb](&_visit_strings_impl, cb) + for s in strings: + c_strings.push_back(tobytes(s)) + + check_status(CVisitStrings(c_strings, c_cb)) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 65426d5de68..72d84c2f330 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3403,3 +3403,13 @@ def test_dataset_null_to_dictionary_cast(tempdir, dataset_reader): ) table = dataset_reader.to_table(fsds) assert table.schema == schema + + +def test_visit_strings_adhoc(): + import pyarrow._dataset as _ds + + strings = ['a', 'b', 'c'] + visited = [] + _ds._visit_strings(strings, visited.append) + + assert visited == strings From 1424bfc695ce105b1154e24a8591bca89c187dee Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 29 Jun 2021 11:38:44 -0400 Subject: [PATCH 3/9] lint fixes --- python/pyarrow/_dataset.pyx | 4 +++- python/pyarrow/tests/test_dataset.py | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 9b9ea4d27ad..f8d25c43450 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -3038,7 +3038,8 @@ cdef class FragmentWriter(_Weakrefable): return FileSystem.wrap(c_filesystem) @staticmethod - cdef void _wrap_visitor(PyObject* raw_visitor, CFileWriter* writer) except *: + cdef void _wrap_visitor(PyObject* raw_visitor, + CFileWriter* writer) except *: visitor = PyObject_to_object(raw_visitor) visitor(FragmentWriter.wrap(writer)) @@ -3109,6 +3110,7 @@ cdef extern from * namespace "arrow::py" nogil: cdef void _visit_strings_impl(py_cb, const c_string& s) except *: py_cb(frombytes(s)) + def _visit_strings(strings, cb): cdef: function[visit_string_cb] c_cb diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 72d84c2f330..7383e3cc24a 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3096,9 +3096,10 @@ def test_write_dataset_use_threads(tempdir): target1 = tempdir / 'partitioned1' paths_written = [] + def writer_pre_finish(writer): pass - #paths_written.append(writer.path) + # paths_written.append(writer.path) ds.write_dataset( dataset, target1, format="feather", partitioning=partitioning, use_threads=True, writer_pre_finish=writer_pre_finish From 49cb16b164d550a622ef350f49acb805bce8314e Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 29 Jun 2021 12:42:01 -0400 Subject: [PATCH 4/9] add raising test case --- cpp/src/arrow/python/common.h | 5 ++++- python/pyarrow/tests/test_dataset.py | 7 +++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/python/common.h b/cpp/src/arrow/python/common.h index 71110cf084c..25bc8025f26 100644 --- a/cpp/src/arrow/python/common.h +++ b/cpp/src/arrow/python/common.h @@ -199,6 +199,7 @@ struct BoundFunction { : bound_arg_(bound_arg), unbound_(unbound) {} Status Invoke(Args... args) const { + PyAcquireGIL lock; unbound_(bound_arg_.obj(), std::forward(args)...); return CheckPyError(); } @@ -219,6 +220,7 @@ struct BoundFunction { : bound_arg_(bound_arg), unbound_(unbound) {} Result Invoke(Args... args) const { + PyAcquireGIL lock; Return ret = unbound_(bound_arg_.obj(), std::forward(args)...); RETURN_NOT_OK(CheckPyError()); return ret; @@ -238,7 +240,8 @@ std::function BindFunction(Return (*unbound)(PyObject*, Args...), Py_XINCREF(bound_arg); auto bound_fn = std::make_shared(unbound, bound_arg); - return [bound_fn](Args... args) { return bound_fn->Invoke(args...); }; + return + [bound_fn](Args... args) { return bound_fn->Invoke(std::forward(args)...); }; } // A temporary conversion of a Python object to a bytes area. diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 7383e3cc24a..fb0454f1b12 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3414,3 +3414,10 @@ def test_visit_strings_adhoc(): _ds._visit_strings(strings, visited.append) assert visited == strings + + with pytest.raises(ValueError, match="wtf"): + def raise_on_b(s): + if s == 'b': + raise ValueError('wtf') + + _ds._visit_strings(strings, raise_on_b) From 4fafbe1d42bb4317d339cf525033edb300061ba6 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 29 Jun 2021 12:43:49 -0400 Subject: [PATCH 5/9] reuse RETURN_IF_PYERROR --- cpp/src/arrow/python/common.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/python/common.h b/cpp/src/arrow/python/common.h index 25bc8025f26..24dcb130a26 100644 --- a/cpp/src/arrow/python/common.h +++ b/cpp/src/arrow/python/common.h @@ -201,7 +201,8 @@ struct BoundFunction { Status Invoke(Args... args) const { PyAcquireGIL lock; unbound_(bound_arg_.obj(), std::forward(args)...); - return CheckPyError(); + RETURN_IF_PYERROR(); + return Status::OK(); } Unbound* unbound_; @@ -222,7 +223,7 @@ struct BoundFunction { Result Invoke(Args... args) const { PyAcquireGIL lock; Return ret = unbound_(bound_arg_.obj(), std::forward(args)...); - RETURN_NOT_OK(CheckPyError()); + RETURN_IF_PYERROR(); return ret; } From 558a67d9306e8b463cee91bcb0ff83839f821102 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 29 Jun 2021 14:22:21 -0400 Subject: [PATCH 6/9] uncomment visitor --- python/pyarrow/tests/test_dataset.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index fb0454f1b12..23bf8e5b6c3 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3096,10 +3096,9 @@ def test_write_dataset_use_threads(tempdir): target1 = tempdir / 'partitioned1' paths_written = [] - def writer_pre_finish(writer): - pass - # paths_written.append(writer.path) + paths_written.append(writer.path) + ds.write_dataset( dataset, target1, format="feather", partitioning=partitioning, use_threads=True, writer_pre_finish=writer_pre_finish From 0a12db8b9b48325bda1d6d9ecaba18c65bd5fc77 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 30 Jun 2021 17:14:29 -0400 Subject: [PATCH 7/9] remove dataset changes leaving only BindFunction --- python/pyarrow/_dataset.pyx | 87 +------------------ python/pyarrow/dataset.py | 6 +- python/pyarrow/includes/libarrow_dataset.pxd | 8 -- python/pyarrow/includes/libarrow_fs.pxd | 4 - .../tests/bound_function_visit_strings.pyx | 68 +++++++++++++++ python/pyarrow/tests/test_cython.py | 48 ++++++++-- python/pyarrow/tests/test_dataset.py | 17 ---- 7 files changed, 113 insertions(+), 125 deletions(-) create mode 100644 python/pyarrow/tests/bound_function_visit_strings.pyx diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index f8d25c43450..bd93da9cb18 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -2994,7 +2994,7 @@ def _get_partition_keys(Expression partition_expression): For example, an expression of - is converted to {'part': 'A', 'year': 2016} + is converted to {'part': 'a', 'year': 2016} """ cdef: CExpression expr = partition_expression.unwrap() @@ -3009,41 +3009,6 @@ def _get_partition_keys(Expression partition_expression): return out -cdef class FragmentWriter(_Weakrefable): - cdef: - CFileWriter* writer - - def __init__(self): - _forbid_instantiation(self.__class__) - - cdef void init(self, CFileWriter* writer): - self.writer = writer - - @staticmethod - cdef wrap(CFileWriter* writer): - cdef FragmentWriter self = FragmentWriter.__new__(FragmentWriter) - self.init(writer) - return self - - @property - def path(self): - cdef bytes path = deref(self.writer).destination().path - return path.decode('utf-8') - - @property - def filesystem(self): - cdef: - shared_ptr[CFileSystem] c_filesystem - c_filesystem = deref(self.writer).destination().filesystem - return FileSystem.wrap(c_filesystem) - - @staticmethod - cdef void _wrap_visitor(PyObject* raw_visitor, - CFileWriter* writer) except *: - visitor = PyObject_to_object(raw_visitor) - visitor(FragmentWriter.wrap(writer)) - - def _filesystemdataset_write( Scanner data not None, object base_dir not None, @@ -3052,7 +3017,6 @@ def _filesystemdataset_write( Partitioning partitioning not None, FileWriteOptions file_options not None, int max_partitions, - writer_pre_finish=None, ): """ CFileSystemDataset.Write wrapper @@ -3068,56 +3032,7 @@ def _filesystemdataset_write( c_options.partitioning = partitioning.unwrap() c_options.max_partitions = max_partitions c_options.basename_template = tobytes(basename_template) - if writer_pre_finish is not None: - c_options.writer_pre_finish = BindFunction[CFileWriterVisitor]( - &FragmentWriter._wrap_visitor, writer_pre_finish) c_scanner = data.unwrap() with nogil: check_status(CFileSystemDataset.Write(c_options, c_scanner)) - - -# basic test to roundtrip through a BoundFunction - -ctypedef CStatus visit_string_cb(const c_string&) - -cdef extern from * namespace "arrow::py" nogil: - """ - #include - #include - #include - - #include "arrow/status.h" - - namespace arrow { - namespace py { - - Status VisitStrings(const std::vector& strs, - std::function cb) { - for (const std::string& str : strs) { - RETURN_NOT_OK(cb(str)); - } - return Status::OK(); - } - - } // namespace py - } // namespace arrow - """ - cdef CStatus CVisitStrings" arrow::py::VisitStrings"( - vector[c_string], function[visit_string_cb]) - - -cdef void _visit_strings_impl(py_cb, const c_string& s) except *: - py_cb(frombytes(s)) - - -def _visit_strings(strings, cb): - cdef: - function[visit_string_cb] c_cb - vector[c_string] c_strings - - c_cb = BindFunction[visit_string_cb](&_visit_strings_impl, cb) - for s in strings: - c_strings.push_back(tobytes(s)) - - check_status(CVisitStrings(c_strings, c_cb)) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index b13f4461c9b..b93f492dd38 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -34,7 +34,6 @@ FileSystemFactoryOptions, FileWriteOptions, Fragment, - FragmentWriter, HivePartitioning, IpcFileFormat, IpcFileWriteOptions, @@ -691,8 +690,7 @@ def _ensure_write_partitioning(scheme): def write_dataset(data, base_dir, basename_template=None, format=None, partitioning=None, schema=None, filesystem=None, file_options=None, use_threads=True, - use_async=False, max_partitions=None, - writer_pre_finish=None): + use_async=False, max_partitions=None): """ Write a dataset to a given format and partitioning. @@ -786,5 +784,5 @@ def write_dataset(data, base_dir, basename_template=None, format=None, _filesystemdataset_write( scanner, base_dir, basename_template, filesystem, partitioning, - file_options, max_partitions, writer_pre_finish + file_options, max_partitions ) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 970d455e43c..8cab5536647 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -235,13 +235,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: vector[int] row_group_ids) CStatus EnsureCompleteMetadata() - cdef cppclass CFileWriter "arrow::dataset::FileWriter": - const CFileLocator& destination() const - -ctypedef CStatus CFileWriterVisitor(CFileWriter*) - -cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: - cdef cppclass CFileSystemDatasetWriteOptions \ "arrow::dataset::FileSystemDatasetWriteOptions": shared_ptr[CFileWriteOptions] file_write_options @@ -250,7 +243,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CPartitioning] partitioning int max_partitions c_string basename_template - function[CFileWriterVisitor] writer_pre_finish cdef cppclass CFileSystemDataset \ "arrow::dataset::FileSystemDataset"(CDataset): diff --git a/python/pyarrow/includes/libarrow_fs.pxd b/python/pyarrow/includes/libarrow_fs.pxd index eef3757bff0..52ef97e5757 100644 --- a/python/pyarrow/includes/libarrow_fs.pxd +++ b/python/pyarrow/includes/libarrow_fs.pxd @@ -52,10 +52,6 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil: c_bool allow_not_found c_bool recursive - cdef cppclass CFileLocator "arrow::fs::FileLocator": - shared_ptr[CFileSystem] filesystem - c_string path - cdef cppclass CFileSystem "arrow::fs::FileSystem": shared_ptr[CFileSystem] shared_from_this() c_string type_name() const diff --git a/python/pyarrow/tests/bound_function_visit_strings.pyx b/python/pyarrow/tests/bound_function_visit_strings.pyx new file mode 100644 index 00000000000..90437be8cde --- /dev/null +++ b/python/pyarrow/tests/bound_function_visit_strings.pyx @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# distutils: language=c++ +# cython: language_level = 3 + +import pyarrow as pa +from pyarrow.lib cimport * +from pyarrow.lib import frombytes, tobytes + +# basic test to roundtrip through a BoundFunction + +ctypedef CStatus visit_string_cb(const c_string&) + +cdef extern from * namespace "arrow::py" nogil: + """ + #include + #include + #include + + #include "arrow/status.h" + + namespace arrow { + namespace py { + + Status VisitStrings(const std::vector& strs, + std::function cb) { + for (const std::string& str : strs) { + RETURN_NOT_OK(cb(str)); + } + return Status::OK(); + } + + } // namespace py + } // namespace arrow + """ + cdef CStatus CVisitStrings" arrow::py::VisitStrings"( + vector[c_string], function[visit_string_cb]) + + +cdef void _visit_strings_impl(py_cb, const c_string& s) except *: + py_cb(frombytes(s)) + + +def _visit_strings(strings, cb): + cdef: + function[visit_string_cb] c_cb + vector[c_string] c_strings + + c_cb = BindFunction[visit_string_cb](&_visit_strings_impl, cb) + for s in strings: + c_strings.push_back(tobytes(s)) + + check_status(CVisitStrings(c_strings, c_cb)) diff --git a/python/pyarrow/tests/test_cython.py b/python/pyarrow/tests/test_cython.py index b852981ba39..39733b6ae4e 100644 --- a/python/pyarrow/tests/test_cython.py +++ b/python/pyarrow/tests/test_cython.py @@ -27,6 +27,11 @@ here = os.path.dirname(os.path.abspath(__file__)) +test_ld_path = os.environ.get('PYARROW_TEST_LD_PATH', '') +if os.name == 'posix': + compiler_opts = ['-std=c++11'] +else: + compiler_opts = [] setup_template = """if 1: @@ -82,18 +87,12 @@ def test_cython_api(tmpdir): # Fail early if cython is not found import cython # noqa - test_ld_path = os.environ.get('PYARROW_TEST_LD_PATH', '') - with tmpdir.as_cwd(): # Set up temporary workspace pyx_file = 'pyarrow_cython_example.pyx' shutil.copyfile(os.path.join(here, pyx_file), os.path.join(str(tmpdir), pyx_file)) # Create setup.py file - if os.name == 'posix': - compiler_opts = ['-std=c++11'] - else: - compiler_opts = [] setup_code = setup_template.format(pyx_file=pyx_file, compiler_opts=compiler_opts, test_ld_path=test_ld_path) @@ -141,3 +140,40 @@ def test_cython_api(tmpdir): subprocess.check_call([sys.executable, '-c', code], stdout=subprocess.PIPE, env=subprocess_env) + +@pytest.mark.cython +def test_visit_strings(tmpdir): + with tmpdir.as_cwd(): + # Set up temporary workspace + pyx_file = 'bound_function_visit_strings.pyx' + shutil.copyfile(os.path.join(here, pyx_file), + os.path.join(str(tmpdir), pyx_file)) + # Create setup.py file + setup_code = setup_template.format(pyx_file=pyx_file, + compiler_opts=compiler_opts, + test_ld_path=test_ld_path) + with open('setup.py', 'w') as f: + f.write(setup_code) + + subprocess_env = test_util.get_modified_env_with_pythonpath() + + # Compile extension module + subprocess.check_call([sys.executable, 'setup.py', + 'build_ext', '--inplace'], + env=subprocess_env) + + sys.path.insert(0, str(tmpdir)) + mod = __import__('bound_function_visit_strings') + + strings = ['a', 'b', 'c'] + visited = [] + mod._visit_strings(strings, visited.append) + + assert visited == strings + + with pytest.raises(ValueError, match="wtf"): + def raise_on_b(s): + if s == 'b': + raise ValueError('wtf') + + mod._visit_strings(strings, raise_on_b) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 23bf8e5b6c3..340d5edb6fd 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3403,20 +3403,3 @@ def test_dataset_null_to_dictionary_cast(tempdir, dataset_reader): ) table = dataset_reader.to_table(fsds) assert table.schema == schema - - -def test_visit_strings_adhoc(): - import pyarrow._dataset as _ds - - strings = ['a', 'b', 'c'] - visited = [] - _ds._visit_strings(strings, visited.append) - - assert visited == strings - - with pytest.raises(ValueError, match="wtf"): - def raise_on_b(s): - if s == 'b': - raise ValueError('wtf') - - _ds._visit_strings(strings, raise_on_b) From 561ab30abf69f25b9d2adf2cf4753da984f58c5d Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 30 Jun 2021 20:57:22 -0400 Subject: [PATCH 8/9] lint fix --- python/pyarrow/tests/test_cython.py | 1 + python/pyarrow/tests/test_dataset.py | 1 + 2 files changed, 2 insertions(+) diff --git a/python/pyarrow/tests/test_cython.py b/python/pyarrow/tests/test_cython.py index 39733b6ae4e..e202b417a18 100644 --- a/python/pyarrow/tests/test_cython.py +++ b/python/pyarrow/tests/test_cython.py @@ -141,6 +141,7 @@ def test_cython_api(tmpdir): stdout=subprocess.PIPE, env=subprocess_env) + @pytest.mark.cython def test_visit_strings(tmpdir): with tmpdir.as_cwd(): diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 340d5edb6fd..d22c2cc7060 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3096,6 +3096,7 @@ def test_write_dataset_use_threads(tempdir): target1 = tempdir / 'partitioned1' paths_written = [] + def writer_pre_finish(writer): paths_written.append(writer.path) From 349858d201507367474daca1d7a16af8e72998d3 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 30 Jun 2021 20:59:46 -0400 Subject: [PATCH 9/9] fix one more lingering dataset change --- python/pyarrow/tests/test_dataset.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index d22c2cc7060..5e83657ebf2 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3095,17 +3095,10 @@ def test_write_dataset_use_threads(tempdir): pa.schema([("part", pa.string())]), flavor="hive") target1 = tempdir / 'partitioned1' - paths_written = [] - - def writer_pre_finish(writer): - paths_written.append(writer.path) - ds.write_dataset( dataset, target1, format="feather", partitioning=partitioning, - use_threads=True, writer_pre_finish=writer_pre_finish + use_threads=True ) - assert paths_written == [] - target2 = tempdir / 'partitioned2' ds.write_dataset( dataset, target2, format="feather", partitioning=partitioning,