From 4dfd0d6c4f69a08bfe24aaa858984a4024d13a06 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 23 Jan 2024 17:27:36 +0000 Subject: [PATCH 1/9] Expose the device interface low-level with Array._export_to_c_device method --- cpp/src/arrow/c/bridge.cc | 2 +- python/pyarrow/array.pxi | 27 +++++++++++++++++++++++++++ python/pyarrow/includes/libarrow.pxd | 11 +++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 022fce72f59..a4eddfbecb6 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -587,7 +587,7 @@ struct ArrayExporter { export_.buffers_.resize(n_buffers); std::transform(buffers_begin, data->buffers.end(), export_.buffers_.begin(), [](const std::shared_ptr& buffer) -> const void* { - return buffer ? buffer->data() : nullptr; + return buffer ? reinterpret_cast(buffer->address()) : nullptr; }); if (need_variadic_buffer_sizes) { diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 1029f3a6298..caf5904d98e 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1682,6 +1682,33 @@ cdef class Array(_PandasConvertible): c_ptr, c_schema_ptr)) + def _export_to_c_device(self, out_ptr, out_schema_ptr): + """ + Export to a C ArrowDeviceArray struct, given its pointer. + + If a C ArrowSchema struct pointer is also given, the array type + is exported to it at the same time. + + Parameters + ---------- + out_ptr: int + The raw pointer to a C ArrowDeviceArray struct. + out_schema_ptr: int (optional) + The raw pointer to a C ArrowSchema struct. + + Be careful: if you don't pass the ArrowDeviceArray struct to a consumer, + array memory will leak. This is a low-level function intended for + expert users. + """ + cdef: + void* c_ptr = _as_c_pointer(out_ptr) + void* c_schema_ptr = _as_c_pointer(out_schema_ptr, + allow_null=True) + with nogil: + check_status(ExportDeviceArray( + deref(self.sp_array), NULL, + c_ptr, c_schema_ptr)) + @staticmethod def _import_from_c(in_ptr, type): """ diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index d92f09da779..2affdce0a96 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -343,6 +343,12 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CResult[unique_ptr[CResizableBuffer]] AllocateResizableBuffer( const int64_t size, CMemoryPool* pool) + cdef cppclass CSyncEvent" arrow::Device::SyncEvent": + pass + + cdef cppclass CDevice" arrow::Device": + pass + cdef CMemoryPool* c_default_memory_pool" arrow::default_memory_pool"() cdef CMemoryPool* c_system_memory_pool" arrow::system_memory_pool"() cdef CStatus c_jemalloc_memory_pool" arrow::jemalloc_memory_pool"( @@ -2811,6 +2817,9 @@ cdef extern from "arrow/c/abi.h": cdef struct ArrowArrayStream: void (*release)(ArrowArrayStream*) noexcept nogil + cdef struct ArrowDeviceArray: + pass + cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil: CStatus ExportType(CDataType&, ArrowSchema* out) CResult[shared_ptr[CDataType]] ImportType(ArrowSchema*) @@ -2840,6 +2849,8 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil: CResult[shared_ptr[CRecordBatchReader]] ImportRecordBatchReader( ArrowArrayStream*) + CStatus ExportDeviceArray(const CArray&, shared_ptr[CSyncEvent], + ArrowDeviceArray* out, ArrowSchema*) cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil: CResult[int64_t] ReferencedBufferSize(const CArray& array_data) From 3b28616fab0b9ff06629e620abda5d9be3c1ba3c Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 7 Feb 2024 17:51:45 +0100 Subject: [PATCH 2/9] add import --- cpp/src/arrow/c/bridge.cc | 21 ++++++- cpp/src/arrow/c/bridge.h | 7 +++ python/pyarrow/array.pxi | 91 +++++++++++++++++++--------- python/pyarrow/cffi.py | 10 +++ python/pyarrow/includes/libarrow.pxd | 4 ++ python/pyarrow/tests/test_cffi.py | 51 ++++++++++++++++ 6 files changed, 156 insertions(+), 28 deletions(-) diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index a4eddfbecb6..3e9c686ab84 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -587,7 +587,8 @@ struct ArrayExporter { export_.buffers_.resize(n_buffers); std::transform(buffers_begin, data->buffers.end(), export_.buffers_.begin(), [](const std::shared_ptr& buffer) -> const void* { - return buffer ? reinterpret_cast(buffer->address()) : nullptr; + return buffer ? reinterpret_cast(buffer->address()) + : nullptr; }); if (need_variadic_buffer_sizes) { @@ -1977,6 +1978,24 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, return ImportDeviceArray(array, *maybe_type, mapper); } +Result> DefaultDeviceMapper(ArrowDeviceType device_type, + int64_t device_id) { + if (device_type != ARROW_DEVICE_CPU) { + return Status::NotImplemented("Only importing data on CPU is supported"); + } + return default_cpu_memory_manager(); +} + +Result> ImportDeviceArray(struct ArrowDeviceArray* array, + std::shared_ptr type) { + return ImportDeviceArray(array, type, DefaultDeviceMapper); +} + +Result> ImportDeviceArray(struct ArrowDeviceArray* array, + struct ArrowSchema* type) { + return ImportDeviceArray(array, type, DefaultDeviceMapper); +} + Result> ImportDeviceRecordBatch( struct ArrowDeviceArray* array, std::shared_ptr schema, const DeviceMemoryMapper& mapper) { diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index e98a42818f6..463ca0500de 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -249,6 +249,13 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, struct ArrowSchema* type, const DeviceMemoryMapper& mapper); +ARROW_EXPORT +Result> ImportDeviceArray(struct ArrowDeviceArray* array, + std::shared_ptr type); +ARROW_EXPORT +Result> ImportDeviceArray(struct ArrowDeviceArray* array, + struct ArrowSchema* type); + /// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device from the C data /// interface. /// diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index caf5904d98e..48d02d9661e 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1682,33 +1682,6 @@ cdef class Array(_PandasConvertible): c_ptr, c_schema_ptr)) - def _export_to_c_device(self, out_ptr, out_schema_ptr): - """ - Export to a C ArrowDeviceArray struct, given its pointer. - - If a C ArrowSchema struct pointer is also given, the array type - is exported to it at the same time. - - Parameters - ---------- - out_ptr: int - The raw pointer to a C ArrowDeviceArray struct. - out_schema_ptr: int (optional) - The raw pointer to a C ArrowSchema struct. - - Be careful: if you don't pass the ArrowDeviceArray struct to a consumer, - array memory will leak. This is a low-level function intended for - expert users. - """ - cdef: - void* c_ptr = _as_c_pointer(out_ptr) - void* c_schema_ptr = _as_c_pointer(out_schema_ptr, - allow_null=True) - with nogil: - check_status(ExportDeviceArray( - deref(self.sp_array), NULL, - c_ptr, c_schema_ptr)) - @staticmethod def _import_from_c(in_ptr, type): """ @@ -1805,6 +1778,70 @@ cdef class Array(_PandasConvertible): return pyarrow_wrap_array(array) + def _export_to_c_device(self, out_ptr, out_schema_ptr=0): + """ + Export to a C ArrowDeviceArray struct, given its pointer. + + If a C ArrowSchema struct pointer is also given, the array type + is exported to it at the same time. + + Parameters + ---------- + out_ptr: int + The raw pointer to a C ArrowDeviceArray struct. + out_schema_ptr: int (optional) + The raw pointer to a C ArrowSchema struct. + + Be careful: if you don't pass the ArrowDeviceArray struct to a consumer, + array memory will leak. This is a low-level function intended for + expert users. + """ + cdef: + void* c_ptr = _as_c_pointer(out_ptr) + void* c_schema_ptr = _as_c_pointer(out_schema_ptr, + allow_null=True) + with nogil: + check_status(ExportDeviceArray( + deref(self.sp_array), NULL, + c_ptr, c_schema_ptr)) + + @staticmethod + def _import_from_c_device(in_ptr, type): + """ + Import Array from a C ArrowDeviceArray struct, given its pointer + and the imported array type. + + Parameters + ---------- + in_ptr: int + The raw pointer to a C ArrowDeviceArray struct. + type: DataType or int + Either a DataType object, or the raw pointer to a C ArrowSchema + struct. + + This is a low-level function intended for expert users. + """ + cdef: + void* c_ptr = _as_c_pointer(in_ptr) + void* c_type_ptr + shared_ptr[CArray] c_array + + c_type = pyarrow_unwrap_data_type(type) + if c_type == nullptr: + # Not a DataType object, perhaps a raw ArrowSchema pointer + c_type_ptr = _as_c_pointer(type) + with nogil: + c_array = GetResultValue( + ImportDeviceArray( c_ptr, + c_type_ptr) + ) + else: + with nogil: + c_array = GetResultValue( + ImportDeviceArray( c_ptr, c_type) + ) + return pyarrow_wrap_array(c_array) + def __dlpack__(self, stream=None): """Export a primitive array as a DLPack capsule. diff --git a/python/pyarrow/cffi.py b/python/pyarrow/cffi.py index 961b61dee59..1da1a916914 100644 --- a/python/pyarrow/cffi.py +++ b/python/pyarrow/cffi.py @@ -64,6 +64,16 @@ // Opaque producer-specific data void* private_data; }; + + typedef int32_t ArrowDeviceType; + + struct ArrowDeviceArray { + struct ArrowArray array; + int64_t device_id; + ArrowDeviceType device_type; + void* sync_event; + int64_t reserved[3]; + }; """ # TODO use out-of-line mode for faster import and avoid C parsing diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 2affdce0a96..3334a2c988f 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2851,6 +2851,10 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil: CStatus ExportDeviceArray(const CArray&, shared_ptr[CSyncEvent], ArrowDeviceArray* out, ArrowSchema*) + CResult[shared_ptr[CArray]] ImportDeviceArray(ArrowDeviceArray*, + shared_ptr[CDataType]) + CResult[shared_ptr[CArray]] ImportDeviceArray(ArrowDeviceArray*, + ArrowSchema*) cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil: CResult[int64_t] ReferencedBufferSize(const CArray& array_data) diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index ff81b06440f..86651479005 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -601,3 +601,54 @@ def test_roundtrip_batch_reader_capsule(): assert imported_reader.read_next_batch().equals(batch) with pytest.raises(StopIteration): imported_reader.read_next_batch() + + +@needs_cffi +def test_export_import_device_array(): + c_schema = ffi.new("struct ArrowSchema*") + ptr_schema = int(ffi.cast("uintptr_t", c_schema)) + c_array = ffi.new("struct ArrowDeviceArray*") + ptr_array = int(ffi.cast("uintptr_t", c_array)) + + gc.collect() # Make sure no Arrow data dangles in a ref cycle + old_allocated = pa.total_allocated_bytes() + + # Type is known up front + typ = pa.list_(pa.int32()) + arr = pa.array([[1], [2, 42]], type=typ) + py_value = arr.to_pylist() + arr._export_to_c_device(ptr_array) + assert pa.total_allocated_bytes() > old_allocated + + # verify exported struct + assert c_array.device_type == 1 # ARROW_DEVICE_CPU 1 + assert c_array.device_id == -1 + assert c_array.array.length == 2 + + # Delete recreate C++ object from exported pointer + del arr + arr_new = pa.Array._import_from_c_device(ptr_array, typ) + assert arr_new.to_pylist() == py_value + assert arr_new.type == pa.list_(pa.int32()) + assert pa.total_allocated_bytes() > old_allocated + del arr_new, typ + assert pa.total_allocated_bytes() == old_allocated + # Now released + with assert_array_released: + pa.Array._import_from_c(ptr_array, pa.list_(pa.int32())) + + # Type is exported and imported at the same time + arr = pa.array([[1], [2, 42]], type=pa.list_(pa.int32())) + py_value = arr.to_pylist() + arr._export_to_c(ptr_array, ptr_schema) + # Delete and recreate C++ objects from exported pointers + del arr + arr_new = pa.Array._import_from_c(ptr_array, ptr_schema) + assert arr_new.to_pylist() == py_value + assert arr_new.type == pa.list_(pa.int32()) + assert pa.total_allocated_bytes() > old_allocated + del arr_new + assert pa.total_allocated_bytes() == old_allocated + # Now released + with assert_schema_released: + pa.Array._import_from_c(ptr_array, ptr_schema) From 596175df3464af1f10d46585221e038f0f4d6673 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Thu, 8 Feb 2024 16:16:26 +0100 Subject: [PATCH 3/9] add export/import for RecordBatch --- cpp/src/arrow/c/bridge.cc | 10 +++++ cpp/src/arrow/c/bridge.h | 7 ++++ python/pyarrow/includes/libarrow.pxd | 15 +++++-- python/pyarrow/table.pxi | 62 ++++++++++++++++++++++++++++ python/pyarrow/tests/test_cffi.py | 61 +++++++++++++++++++++++++++ 5 files changed, 151 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 3e9c686ab84..495234344b4 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -2016,6 +2016,16 @@ Result> ImportDeviceRecordBatch( return ImportDeviceRecordBatch(array, *maybe_schema, mapper); } +Result> ImportDeviceRecordBatch( + struct ArrowDeviceArray* array, std::shared_ptr schema) { + return ImportDeviceRecordBatch(array, schema, DefaultDeviceMapper); +} + +Result> ImportDeviceRecordBatch( + struct ArrowDeviceArray* array, struct ArrowSchema* schema) { + return ImportDeviceRecordBatch(array, schema, DefaultDeviceMapper); +} + ////////////////////////////////////////////////////////////////////////// // C stream export diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index 463ca0500de..6a381005eb1 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -292,6 +292,13 @@ Result> ImportDeviceRecordBatch( struct ArrowDeviceArray* array, struct ArrowSchema* schema, const DeviceMemoryMapper& mapper); +ARROW_EXPORT +Result> ImportDeviceRecordBatch( + struct ArrowDeviceArray* array, std::shared_ptr schema); +ARROW_EXPORT +Result> ImportDeviceRecordBatch( + struct ArrowDeviceArray* array, struct ArrowSchema* schema); + /// @} /// \defgroup c-stream-interface Functions for working with the C data interface. diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 3334a2c988f..35a9ff5569d 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2851,10 +2851,17 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil: CStatus ExportDeviceArray(const CArray&, shared_ptr[CSyncEvent], ArrowDeviceArray* out, ArrowSchema*) - CResult[shared_ptr[CArray]] ImportDeviceArray(ArrowDeviceArray*, - shared_ptr[CDataType]) - CResult[shared_ptr[CArray]] ImportDeviceArray(ArrowDeviceArray*, - ArrowSchema*) + CResult[shared_ptr[CArray]] ImportDeviceArray( + ArrowDeviceArray*, shared_ptr[CDataType]) + CResult[shared_ptr[CArray]] ImportDeviceArray( + ArrowDeviceArray*, ArrowSchema*) + + CStatus ExportDeviceRecordBatch(const CRecordBatch&, shared_ptr[CSyncEvent], + ArrowDeviceArray* out, ArrowSchema*) + CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch( + ArrowDeviceArray*, shared_ptr[CSchema]) + CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch( + ArrowDeviceArray*, ArrowSchema*) cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil: CResult[int64_t] ReferencedBufferSize(const CArray& array_data) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index abda784fb7c..b92b92147e4 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -3084,6 +3084,68 @@ cdef class RecordBatch(_Tabular): return pyarrow_wrap_batch(c_batch) + def _export_to_c_device(self, out_ptr, out_schema_ptr=0): + """ + Export to a C ArrowDeviceArray struct, given its pointer. + + If a C ArrowSchema struct pointer is also given, the record batch + schema is exported to it at the same time. + + Parameters + ---------- + out_ptr: int + The raw pointer to a C ArrowDeviceArray struct. + out_schema_ptr: int (optional) + The raw pointer to a C ArrowSchema struct. + + Be careful: if you don't pass the ArrowDeviceArray struct to a consumer, + array memory will leak. This is a low-level function intended for + expert users. + """ + cdef: + void* c_ptr = _as_c_pointer(out_ptr) + void* c_schema_ptr = _as_c_pointer(out_schema_ptr, + allow_null=True) + with nogil: + check_status(ExportDeviceRecordBatch( + deref(self.sp_batch), NULL, + c_ptr, c_schema_ptr) + ) + + @staticmethod + def _import_from_c_device(in_ptr, schema): + """ + Import RecordBatch from a C ArrowDeviceArray struct, given its pointer + and the imported schema. + + Parameters + ---------- + in_ptr: int + The raw pointer to a C ArrowDeviceArray struct. + type: Schema or int + Either a Schema object, or the raw pointer to a C ArrowSchema + struct. + + This is a low-level function intended for expert users. + """ + cdef: + void* c_ptr = _as_c_pointer(in_ptr) + void* c_schema_ptr + shared_ptr[CRecordBatch] c_batch + + c_schema = pyarrow_unwrap_schema(schema) + if c_schema == nullptr: + # Not a Schema object, perhaps a raw ArrowSchema pointer + c_schema_ptr = _as_c_pointer(schema, allow_null=True) + with nogil: + c_batch = GetResultValue(ImportDeviceRecordBatch( + c_ptr, c_schema_ptr)) + else: + with nogil: + c_batch = GetResultValue(ImportDeviceRecordBatch( + c_ptr, c_schema)) + return pyarrow_wrap_batch(c_batch) + def _reconstruct_record_batch(columns, schema): """ diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index 86651479005..1bb4d5e2e50 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -652,3 +652,64 @@ def test_export_import_device_array(): # Now released with assert_schema_released: pa.Array._import_from_c(ptr_array, ptr_schema) + + +@needs_cffi +def test_export_import_device_batch(): + c_schema = ffi.new("struct ArrowSchema*") + ptr_schema = int(ffi.cast("uintptr_t", c_schema)) + c_array = ffi.new("struct ArrowDeviceArray*") + ptr_array = int(ffi.cast("uintptr_t", c_array)) + + gc.collect() # Make sure no Arrow data dangles in a ref cycle + old_allocated = pa.total_allocated_bytes() + + # Schema is known up front + batch = make_batch() + schema = batch.schema + py_value = batch.to_pydict() + batch._export_to_c_device(ptr_array) + assert pa.total_allocated_bytes() > old_allocated + + # verify exported struct + assert c_array.device_type == 1 # ARROW_DEVICE_CPU 1 + assert c_array.device_id == -1 + assert c_array.array.length == 2 + + # Delete and recreate C++ object from exported pointer + del batch + batch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema) + assert batch_new.to_pydict() == py_value + assert batch_new.schema == schema + assert pa.total_allocated_bytes() > old_allocated + del batch_new, schema + assert pa.total_allocated_bytes() == old_allocated + # Now released + with assert_array_released: + pa.RecordBatch._import_from_c_device(ptr_array, make_schema()) + + # Type is exported and imported at the same time + batch = make_batch() + py_value = batch.to_pydict() + batch._export_to_c_device(ptr_array, ptr_schema) + # Delete and recreate C++ objects from exported pointers + del batch + batch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) + assert batch_new.to_pydict() == py_value + assert batch_new.schema == make_batch().schema + assert pa.total_allocated_bytes() > old_allocated + del batch_new + assert pa.total_allocated_bytes() == old_allocated + # Now released + with assert_schema_released: + pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) + + # Not a struct type + pa.int32()._export_to_c(ptr_schema) + make_batch()._export_to_c_device(ptr_array) + with pytest.raises(ValueError, + match="ArrowSchema describes non-struct type"): + pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) + # Now released + with assert_schema_released: + pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) From 6ffc6b83a196f28799c8b6a7d7353631d1ccfeed Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Thu, 8 Feb 2024 16:18:10 +0100 Subject: [PATCH 4/9] undo address change, handled in separate PR --- cpp/src/arrow/c/bridge.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 495234344b4..b3ba51c431f 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -587,8 +587,7 @@ struct ArrayExporter { export_.buffers_.resize(n_buffers); std::transform(buffers_begin, data->buffers.end(), export_.buffers_.begin(), [](const std::shared_ptr& buffer) -> const void* { - return buffer ? reinterpret_cast(buffer->address()) - : nullptr; + return buffer ? buffer->data() : nullptr; }); if (need_variadic_buffer_sizes) { From 864a52cbb730fd82aa8bf8d10dd21754bd48fbc3 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 14 Feb 2024 11:18:17 +0100 Subject: [PATCH 5/9] use default argument --- cpp/src/arrow/c/bridge.cc | 36 +++++++------------------------ cpp/src/arrow/c/bridge.h | 45 +++++++++++++++++---------------------- 2 files changed, 27 insertions(+), 54 deletions(-) diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index b3ba51c431f..c38cb5c891d 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -1958,6 +1958,14 @@ Result> ImportRecordBatch(struct ArrowArray* array, return ImportRecordBatch(array, *maybe_schema); } +Result> DefaultDeviceMapper(ArrowDeviceType device_type, + int64_t device_id) { + if (device_type != ARROW_DEVICE_CPU) { + return Status::NotImplemented("Only importing data on CPU is supported"); + } + return default_cpu_memory_manager(); +} + Result> ImportDeviceArray(struct ArrowDeviceArray* array, std::shared_ptr type, const DeviceMemoryMapper& mapper) { @@ -1977,24 +1985,6 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, return ImportDeviceArray(array, *maybe_type, mapper); } -Result> DefaultDeviceMapper(ArrowDeviceType device_type, - int64_t device_id) { - if (device_type != ARROW_DEVICE_CPU) { - return Status::NotImplemented("Only importing data on CPU is supported"); - } - return default_cpu_memory_manager(); -} - -Result> ImportDeviceArray(struct ArrowDeviceArray* array, - std::shared_ptr type) { - return ImportDeviceArray(array, type, DefaultDeviceMapper); -} - -Result> ImportDeviceArray(struct ArrowDeviceArray* array, - struct ArrowSchema* type) { - return ImportDeviceArray(array, type, DefaultDeviceMapper); -} - Result> ImportDeviceRecordBatch( struct ArrowDeviceArray* array, std::shared_ptr schema, const DeviceMemoryMapper& mapper) { @@ -2015,16 +2005,6 @@ Result> ImportDeviceRecordBatch( return ImportDeviceRecordBatch(array, *maybe_schema, mapper); } -Result> ImportDeviceRecordBatch( - struct ArrowDeviceArray* array, std::shared_ptr schema) { - return ImportDeviceRecordBatch(array, schema, DefaultDeviceMapper); -} - -Result> ImportDeviceRecordBatch( - struct ArrowDeviceArray* array, struct ArrowSchema* schema) { - return ImportDeviceRecordBatch(array, schema, DefaultDeviceMapper); -} - ////////////////////////////////////////////////////////////////////////// // C stream export diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index 6a381005eb1..18d025678ca 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -218,6 +218,9 @@ Status ExportDeviceRecordBatch(const RecordBatch& batch, using DeviceMemoryMapper = std::function>(ArrowDeviceType, int64_t)>; +Result> DefaultDeviceMapper(ArrowDeviceType device_type, + int64_t device_id); + /// \brief EXPERIMENTAL: Import C++ device array from the C data interface. /// /// The ArrowArray struct has its contents moved (as per the C data interface @@ -226,12 +229,13 @@ using DeviceMemoryMapper = /// /// \param[in,out] array C data interface struct holding the array data /// \param[in] type type of the imported array -/// \param[in] mapper A function to map device + id to memory manager +/// \param[in] mapper A function to map device + id to memory manager. If not +/// specified, defaults to map "cpu" to the built-in default memory manager. /// \return Imported array object ARROW_EXPORT -Result> ImportDeviceArray(struct ArrowDeviceArray* array, - std::shared_ptr type, - const DeviceMemoryMapper& mapper); +Result> ImportDeviceArray( + struct ArrowDeviceArray* array, std::shared_ptr type, + const DeviceMemoryMapper& mapper = DefaultDeviceMapper); /// \brief EXPERIMENTAL: Import C++ device array and its type from the C data interface. /// @@ -242,19 +246,13 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, /// /// \param[in,out] array C data interface struct holding the array data /// \param[in,out] type C data interface struct holding the array type -/// \param[in] mapper A function to map device + id to memory manager +/// \param[in] mapper A function to map device + id to memory manager. If not +/// specified, defaults to map "cpu" to the built-in default memory manager. /// \return Imported array object ARROW_EXPORT -Result> ImportDeviceArray(struct ArrowDeviceArray* array, - struct ArrowSchema* type, - const DeviceMemoryMapper& mapper); - -ARROW_EXPORT -Result> ImportDeviceArray(struct ArrowDeviceArray* array, - std::shared_ptr type); -ARROW_EXPORT -Result> ImportDeviceArray(struct ArrowDeviceArray* array, - struct ArrowSchema* type); +Result> ImportDeviceArray( + struct ArrowDeviceArray* array, struct ArrowSchema* type, + const DeviceMemoryMapper& mapper = DefaultDeviceMapper); /// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device from the C data /// interface. @@ -266,12 +264,13 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, /// /// \param[in,out] array C data interface struct holding the record batch data /// \param[in] schema schema of the imported record batch -/// \param[in] mapper A function to map device + id to memory manager +/// \param[in] mapper A function to map device + id to memory manager. If not +/// specified, defaults to map "cpu" to the built-in default memory manager. /// \return Imported record batch object ARROW_EXPORT Result> ImportDeviceRecordBatch( struct ArrowDeviceArray* array, std::shared_ptr schema, - const DeviceMemoryMapper& mapper); + const DeviceMemoryMapper& mapper = DefaultDeviceMapper); /// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device and its schema /// from the C data interface. @@ -285,19 +284,13 @@ Result> ImportDeviceRecordBatch( /// /// \param[in,out] array C data interface struct holding the record batch data /// \param[in,out] schema C data interface struct holding the record batch schema -/// \param[in] mapper A function to map device + id to memory manager +/// \param[in] mapper A function to map device + id to memory manager. If not +/// specified, defaults to map "cpu" to the built-in default memory manager. /// \return Imported record batch object ARROW_EXPORT Result> ImportDeviceRecordBatch( struct ArrowDeviceArray* array, struct ArrowSchema* schema, - const DeviceMemoryMapper& mapper); - -ARROW_EXPORT -Result> ImportDeviceRecordBatch( - struct ArrowDeviceArray* array, std::shared_ptr schema); -ARROW_EXPORT -Result> ImportDeviceRecordBatch( - struct ArrowDeviceArray* array, struct ArrowSchema* schema); + const DeviceMemoryMapper& mapper = DefaultDeviceMapper); /// @} From d64f0e006b7d9654374174f8f51e0b0a8a02cb11 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 14 Feb 2024 11:42:53 +0100 Subject: [PATCH 6/9] fixup linting after merge --- python/pyarrow/includes/libarrow.pxd | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 95ee0e95e34..448ad7baa0b 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2957,7 +2957,6 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil: ArrowDeviceArray*, ArrowSchema*) - cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil: CResult[int64_t] ReferencedBufferSize(const CArray& array_data) CResult[int64_t] ReferencedBufferSize(const CRecordBatch& record_batch) From 5e6c3d50a0a6661895ca93d6ca76d1760957ef79 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 28 Feb 2024 10:15:21 +0100 Subject: [PATCH 7/9] refactor tests --- python/pyarrow/tests/test_cffi.py | 207 +++++++++++------------------- 1 file changed, 75 insertions(+), 132 deletions(-) diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index c1cd6c5a7c3..298ca342370 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -181,11 +181,10 @@ def test_export_import_field(): pa.Field._import_from_c(ptr_schema) -@needs_cffi -def test_export_import_array(): +def check_export_import_array(array_type, exporter, importer): c_schema = ffi.new("struct ArrowSchema*") ptr_schema = int(ffi.cast("uintptr_t", c_schema)) - c_array = ffi.new("struct ArrowArray*") + c_array = ffi.new(f"struct {array_type}*") ptr_array = int(ffi.cast("uintptr_t", c_array)) gc.collect() # Make sure no Arrow data dangles in a ref cycle @@ -195,11 +194,11 @@ def test_export_import_array(): typ = pa.list_(pa.int32()) arr = pa.array([[1], [2, 42]], type=typ) py_value = arr.to_pylist() - arr._export_to_c(ptr_array) + exporter(arr, ptr_array) assert pa.total_allocated_bytes() > old_allocated # Delete recreate C++ object from exported pointer del arr - arr_new = pa.Array._import_from_c(ptr_array, typ) + arr_new = importer(ptr_array, typ) assert arr_new.to_pylist() == py_value assert arr_new.type == pa.list_(pa.int32()) assert pa.total_allocated_bytes() > old_allocated @@ -207,15 +206,15 @@ def test_export_import_array(): assert pa.total_allocated_bytes() == old_allocated # Now released with assert_array_released: - pa.Array._import_from_c(ptr_array, pa.list_(pa.int32())) + importer(ptr_array, pa.list_(pa.int32())) # Type is exported and imported at the same time arr = pa.array([[1], [2, 42]], type=pa.list_(pa.int32())) py_value = arr.to_pylist() - arr._export_to_c(ptr_array, ptr_schema) + exporter(arr, ptr_array, ptr_schema) # Delete and recreate C++ objects from exported pointers del arr - arr_new = pa.Array._import_from_c(ptr_array, ptr_schema) + arr_new = importer(ptr_array, ptr_schema) assert arr_new.to_pylist() == py_value assert arr_new.type == pa.list_(pa.int32()) assert pa.total_allocated_bytes() > old_allocated @@ -223,7 +222,35 @@ def test_export_import_array(): assert pa.total_allocated_bytes() == old_allocated # Now released with assert_schema_released: - pa.Array._import_from_c(ptr_array, ptr_schema) + importer(ptr_array, ptr_schema) + + +@needs_cffi +def test_export_import_array(): + check_export_import_array( + "ArrowArray", + pa.Array._export_to_c, + pa.Array._import_from_c, + ) + + +@needs_cffi +def test_export_import_device_array(): + check_export_import_array( + "ArrowDeviceArray", + pa.Array._export_to_c_device, + pa.Array._import_from_c_device, + ) + + # verify exported struct + c_array = ffi.new("struct ArrowDeviceArray*") + ptr_array = int(ffi.cast("uintptr_t", c_array)) + arr = pa.array([[1], [2, 42]], type=pa.list_(pa.int32())) + arr._export_to_c_device(ptr_array) + + assert c_array.device_type == 1 # ARROW_DEVICE_CPU 1 + assert c_array.device_id == -1 + assert c_array.array.length == 2 def check_export_import_schema(schema_factory, expected_schema_factory=None): @@ -289,10 +316,10 @@ def test_export_import_schema_float_pointer(): assert schema_new == make_schema() -def check_export_import_batch(batch_factory): +def check_export_import_batch(array_type, exporter, importer, batch_factory): c_schema = ffi.new("struct ArrowSchema*") ptr_schema = int(ffi.cast("uintptr_t", c_schema)) - c_array = ffi.new("struct ArrowArray*") + c_array = ffi.new(f"struct {array_type}*") ptr_array = int(ffi.cast("uintptr_t", c_array)) gc.collect() # Make sure no Arrow data dangles in a ref cycle @@ -302,11 +329,11 @@ def check_export_import_batch(batch_factory): batch = batch_factory() schema = batch.schema py_value = batch.to_pydict() - batch._export_to_c(ptr_array) + exporter(batch, ptr_array) assert pa.total_allocated_bytes() > old_allocated # Delete and recreate C++ object from exported pointer del batch - batch_new = pa.RecordBatch._import_from_c(ptr_array, schema) + batch_new = importer(ptr_array, schema) assert batch_new.to_pydict() == py_value assert batch_new.schema == schema assert pa.total_allocated_bytes() > old_allocated @@ -314,7 +341,7 @@ def check_export_import_batch(batch_factory): assert pa.total_allocated_bytes() == old_allocated # Now released with assert_array_released: - pa.RecordBatch._import_from_c(ptr_array, make_schema()) + importer(ptr_array, make_schema()) # Type is exported and imported at the same time batch = batch_factory() @@ -322,7 +349,7 @@ def check_export_import_batch(batch_factory): batch._export_to_c(ptr_array, ptr_schema) # Delete and recreate C++ objects from exported pointers del batch - batch_new = pa.RecordBatch._import_from_c(ptr_array, ptr_schema) + batch_new = importer(ptr_array, ptr_schema) assert batch_new.to_pydict() == py_value assert batch_new.schema == batch_factory().schema assert pa.total_allocated_bytes() > old_allocated @@ -330,28 +357,56 @@ def check_export_import_batch(batch_factory): assert pa.total_allocated_bytes() == old_allocated # Now released with assert_schema_released: - pa.RecordBatch._import_from_c(ptr_array, ptr_schema) + importer(ptr_array, ptr_schema) # Not a struct type pa.int32()._export_to_c(ptr_schema) batch_factory()._export_to_c(ptr_array) with pytest.raises(ValueError, match="ArrowSchema describes non-struct type"): - pa.RecordBatch._import_from_c(ptr_array, ptr_schema) + importer(ptr_array, ptr_schema) # Now released with assert_schema_released: - pa.RecordBatch._import_from_c(ptr_array, ptr_schema) + importer(ptr_array, ptr_schema) @needs_cffi def test_export_import_batch(): - check_export_import_batch(make_batch) + check_export_import_batch( + "ArrowArray", + pa.RecordBatch._export_to_c, + pa.RecordBatch._import_from_c, + make_batch, + ) @needs_cffi def test_export_import_batch_with_extension(): with registered_extension_type(ParamExtType(1)): - check_export_import_batch(make_extension_batch) + check_export_import_batch( + "ArrowArray", + pa.RecordBatch._export_to_c, + pa.RecordBatch._import_from_c, + make_extension_batch, + ) + +@needs_cffi +def test_export_import_device_batch(): + check_export_import_batch( + "ArrowDeviceArray", + pa.RecordBatch._export_to_c_device, + pa.RecordBatch._import_from_c_device, + make_batch, + ) + + # verify exported struct + c_array = ffi.new("struct ArrowDeviceArray*") + ptr_array = int(ffi.cast("uintptr_t", c_array)) + batch = make_batch() + batch._export_to_c_device(ptr_array) + assert c_array.device_type == 1 # ARROW_DEVICE_CPU 1 + assert c_array.device_id == -1 + assert c_array.array.length == 2 def _export_import_batch_reader(ptr_stream, reader_factory): @@ -627,115 +682,3 @@ def test_roundtrip_chunked_array_capsule_requested_schema(): requested_capsule = requested_type.__arrow_c_schema__() with pytest.raises(NotImplementedError): chunked.__arrow_c_stream__(requested_capsule) - - -@needs_cffi -def test_export_import_device_array(): - c_schema = ffi.new("struct ArrowSchema*") - ptr_schema = int(ffi.cast("uintptr_t", c_schema)) - c_array = ffi.new("struct ArrowDeviceArray*") - ptr_array = int(ffi.cast("uintptr_t", c_array)) - - gc.collect() # Make sure no Arrow data dangles in a ref cycle - old_allocated = pa.total_allocated_bytes() - - # Type is known up front - typ = pa.list_(pa.int32()) - arr = pa.array([[1], [2, 42]], type=typ) - py_value = arr.to_pylist() - arr._export_to_c_device(ptr_array) - assert pa.total_allocated_bytes() > old_allocated - - # verify exported struct - assert c_array.device_type == 1 # ARROW_DEVICE_CPU 1 - assert c_array.device_id == -1 - assert c_array.array.length == 2 - - # Delete recreate C++ object from exported pointer - del arr - arr_new = pa.Array._import_from_c_device(ptr_array, typ) - assert arr_new.to_pylist() == py_value - assert arr_new.type == pa.list_(pa.int32()) - assert pa.total_allocated_bytes() > old_allocated - del arr_new, typ - assert pa.total_allocated_bytes() == old_allocated - # Now released - with assert_array_released: - pa.Array._import_from_c(ptr_array, pa.list_(pa.int32())) - - # Type is exported and imported at the same time - arr = pa.array([[1], [2, 42]], type=pa.list_(pa.int32())) - py_value = arr.to_pylist() - arr._export_to_c(ptr_array, ptr_schema) - # Delete and recreate C++ objects from exported pointers - del arr - arr_new = pa.Array._import_from_c(ptr_array, ptr_schema) - assert arr_new.to_pylist() == py_value - assert arr_new.type == pa.list_(pa.int32()) - assert pa.total_allocated_bytes() > old_allocated - del arr_new - assert pa.total_allocated_bytes() == old_allocated - # Now released - with assert_schema_released: - pa.Array._import_from_c(ptr_array, ptr_schema) - - -@needs_cffi -def test_export_import_device_batch(): - c_schema = ffi.new("struct ArrowSchema*") - ptr_schema = int(ffi.cast("uintptr_t", c_schema)) - c_array = ffi.new("struct ArrowDeviceArray*") - ptr_array = int(ffi.cast("uintptr_t", c_array)) - - gc.collect() # Make sure no Arrow data dangles in a ref cycle - old_allocated = pa.total_allocated_bytes() - - # Schema is known up front - batch = make_batch() - schema = batch.schema - py_value = batch.to_pydict() - batch._export_to_c_device(ptr_array) - assert pa.total_allocated_bytes() > old_allocated - - # verify exported struct - assert c_array.device_type == 1 # ARROW_DEVICE_CPU 1 - assert c_array.device_id == -1 - assert c_array.array.length == 2 - - # Delete and recreate C++ object from exported pointer - del batch - batch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema) - assert batch_new.to_pydict() == py_value - assert batch_new.schema == schema - assert pa.total_allocated_bytes() > old_allocated - del batch_new, schema - assert pa.total_allocated_bytes() == old_allocated - # Now released - with assert_array_released: - pa.RecordBatch._import_from_c_device(ptr_array, make_schema()) - - # Type is exported and imported at the same time - batch = make_batch() - py_value = batch.to_pydict() - batch._export_to_c_device(ptr_array, ptr_schema) - # Delete and recreate C++ objects from exported pointers - del batch - batch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) - assert batch_new.to_pydict() == py_value - assert batch_new.schema == make_batch().schema - assert pa.total_allocated_bytes() > old_allocated - del batch_new - assert pa.total_allocated_bytes() == old_allocated - # Now released - with assert_schema_released: - pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) - - # Not a struct type - pa.int32()._export_to_c(ptr_schema) - make_batch()._export_to_c_device(ptr_array) - with pytest.raises(ValueError, - match="ArrowSchema describes non-struct type"): - pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) - # Now released - with assert_schema_released: - pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) From 51064e393735a1173abdacfe03c852b0a2a16964 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 28 Feb 2024 10:35:59 +0100 Subject: [PATCH 8/9] linting --- python/pyarrow/tests/test_cffi.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index 298ca342370..ce50fe6a6f8 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -390,6 +390,7 @@ def test_export_import_batch_with_extension(): make_extension_batch, ) + @needs_cffi def test_export_import_device_batch(): check_export_import_batch( From 2afbc638d02bf3fe04076d67d1a97865ce7bc512 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 28 Feb 2024 13:44:55 +0100 Subject: [PATCH 9/9] add export --- cpp/src/arrow/c/bridge.h | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index 18d025678ca..0ced3d38cd1 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -218,6 +218,7 @@ Status ExportDeviceRecordBatch(const RecordBatch& batch, using DeviceMemoryMapper = std::function>(ArrowDeviceType, int64_t)>; +ARROW_EXPORT Result> DefaultDeviceMapper(ArrowDeviceType device_type, int64_t device_id);