From ef8be15ea69cff08a35cbb3546c1b491a29efa3e Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 6 Mar 2024 14:48:24 +0000 Subject: [PATCH 01/12] support + test cuda device interface (with hardcoded dependency on libarrow_cuda for now) --- python/CMakeLists.txt | 1 + python/pyarrow/array.pxi | 9 +- python/pyarrow/includes/libarrow.pxd | 20 ++- python/pyarrow/includes/libarrow_cuda.pxd | 3 + python/pyarrow/includes/libarrow_memory.pxd | 34 +++++ python/pyarrow/table.pxi | 9 +- python/pyarrow/tests/test_cuda.py | 153 ++++++++++++++++++++ 7 files changed, 220 insertions(+), 9 deletions(-) create mode 100644 python/pyarrow/includes/libarrow_memory.pxd diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index af65ea7d614..6fece1ab9bc 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -796,6 +796,7 @@ install(FILES "${ARROW_PYTHON_BINARY_DIR}/lib_api.h" "${ARROW_PYTHON_BINARY_DIR} if(PYARROW_BUILD_CUDA) target_link_libraries(_cuda PRIVATE ${CUDA_LINK_LIBS}) + target_link_libraries(lib PRIVATE ${CUDA_LINK_LIBS}) endif() if(PYARROW_BUILD_FLIGHT) diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index def4c5e9ba9..b4ac4b4f1c2 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -21,6 +21,9 @@ import os import warnings from cython import sizeof +from pyarrow.includes.libarrow_cuda cimport DefaultMemoryMapper +# from pyarrow.includes.libarrow_memory cimport DeviceMapper + cdef _sequence_to_array(object sequence, object mask, object size, DataType type, CMemoryPool* pool, c_bool from_pandas): @@ -1834,12 +1837,14 @@ cdef class Array(_PandasConvertible): with nogil: c_array = GetResultValue( ImportDeviceArray( c_ptr, - c_type_ptr) + c_type_ptr, + DefaultMemoryMapper) ) else: with nogil: c_array = GetResultValue( - ImportDeviceArray( c_ptr, c_type) + ImportDeviceArray( c_ptr, c_type, + DefaultMemoryMapper) ) return pyarrow_wrap_array(c_array) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 9e5e3d3fa68..1d28f944818 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -346,6 +346,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CResult[unique_ptr[CResizableBuffer]] AllocateResizableBuffer( const int64_t size, CMemoryPool* pool) + cdef cppclass CMemoryManager" arrow::MemoryManager": + pass + cdef cppclass CSyncEvent" arrow::Device::SyncEvent": pass @@ -2916,6 +2919,8 @@ cdef extern from "arrow/c/abi.h": cdef struct ArrowArrayStream: void (*release)(ArrowArrayStream*) noexcept nogil + ctypedef int32_t ArrowDeviceType + cdef struct ArrowDeviceArray: pass @@ -2951,20 +2956,25 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil: CStatus ExportChunkedArray(shared_ptr[CChunkedArray], ArrowArrayStream*) CResult[shared_ptr[CChunkedArray]] ImportChunkedArray(ArrowArrayStream*) + ctypedef CResult[shared_ptr[CMemoryManager]] CDeviceMemoryMapper( + ArrowDeviceType, int64_t) + + CResult[shared_ptr[CMemoryManager]] DefaultDeviceMapper( + ArrowDeviceType device_type, int64_t device_id) + CStatus ExportDeviceArray(const CArray&, shared_ptr[CSyncEvent], ArrowDeviceArray* out, ArrowSchema*) CResult[shared_ptr[CArray]] ImportDeviceArray( - ArrowDeviceArray*, shared_ptr[CDataType]) + ArrowDeviceArray*, shared_ptr[CDataType], const CDeviceMemoryMapper&) CResult[shared_ptr[CArray]] ImportDeviceArray( - ArrowDeviceArray*, ArrowSchema*) + ArrowDeviceArray*, ArrowSchema*, const CDeviceMemoryMapper&) CStatus ExportDeviceRecordBatch(const CRecordBatch&, shared_ptr[CSyncEvent], ArrowDeviceArray* out, ArrowSchema*) CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch( - ArrowDeviceArray*, shared_ptr[CSchema]) + ArrowDeviceArray*, shared_ptr[CSchema], const CDeviceMemoryMapper&) CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch( - ArrowDeviceArray*, ArrowSchema*) - + ArrowDeviceArray*, ArrowSchema*, const CDeviceMemoryMapper&) cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil: CResult[int64_t] ReferencedBufferSize(const CArray& array_data) diff --git a/python/pyarrow/includes/libarrow_cuda.pxd b/python/pyarrow/includes/libarrow_cuda.pxd index 3ac943cf941..ac9c9f6a0e7 100644 --- a/python/pyarrow/includes/libarrow_cuda.pxd +++ b/python/pyarrow/includes/libarrow_cuda.pxd @@ -93,6 +93,9 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::cuda" nogil: CResult[shared_ptr[CCudaHostBuffer]] AllocateCudaHostBuffer( int device_number, const int64_t size) + CResult[shared_ptr[CMemoryManager]] DefaultMemoryMapper( + ArrowDeviceType device_type, int64_t device_id) + # Cuda prefix is added to avoid picking up arrow::cuda functions # from arrow namespace. CResult[shared_ptr[CCudaBuffer]] \ diff --git a/python/pyarrow/includes/libarrow_memory.pxd b/python/pyarrow/includes/libarrow_memory.pxd new file mode 100644 index 00000000000..463beb6f0b6 --- /dev/null +++ b/python/pyarrow/includes/libarrow_memory.pxd @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# distutils: language = c++ + +from pyarrow.includes.libarrow cimport * + + +cdef extern from *: + """ + #ifdef ARROW_CUDA + #include "arrow/gpu/cuda_api.h" + using DeviceMapper = arrow::cuda::DefaultMemoryMapper; + #else + #include "arrow/c/bridge.h" + using DeviceMapper = arrow::DefaultDeviceMapper; + #endif + """ + CResult[shared_ptr[CMemoryManager]] DeviceMapper( + ArrowDeviceType device_type, int64_t device_id) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 164427d2747..4062b99500f 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -20,6 +20,10 @@ from cpython.pycapsule cimport PyCapsule_CheckExact, PyCapsule_GetPointer, PyCap import warnings from cython import sizeof +from pyarrow.includes.libarrow_cuda cimport DefaultMemoryMapper +# from pyarrow.includes.libarrow_memory cimport DeviceMapper + + cdef class ChunkedArray(_PandasConvertible): """ An array-like composed from a (possibly empty) collection of pyarrow.Arrays @@ -3596,11 +3600,12 @@ cdef class RecordBatch(_Tabular): c_schema_ptr = _as_c_pointer(schema, allow_null=True) with nogil: c_batch = GetResultValue(ImportDeviceRecordBatch( - c_ptr, c_schema_ptr)) + c_ptr, c_schema_ptr, + DefaultMemoryMapper)) else: with nogil: c_batch = GetResultValue(ImportDeviceRecordBatch( - c_ptr, c_schema)) + c_ptr, c_schema, DefaultMemoryMapper)) return pyarrow_wrap_batch(c_batch) diff --git a/python/pyarrow/tests/test_cuda.py b/python/pyarrow/tests/test_cuda.py index 43cd16a3cf6..bf52b77d179 100644 --- a/python/pyarrow/tests/test_cuda.py +++ b/python/pyarrow/tests/test_cuda.py @@ -792,3 +792,156 @@ def test_IPC(size): p.start() p.join() assert p.exitcode == 0 + + +def _arr_copy_to_host(carr): + # TODO replace below with copy to device when exposed in python + buffers = [] + for cbuf in carr.buffers(): + if cbuf is None: + buffers.append(None) + else: + buf = global_context.foreign_buffer( + cbuf.address, cbuf.size, cbuf + ).copy_to_host() + buffers.append(buf) + + child = pa.Array.from_buffers(carr.type.value_type, 3, buffers[2:]) + new = pa.Array.from_buffers(carr.type, 2, buffers[:2], children=[child]) + return new + + +def test_device_interface_array(): + cffi = pytest.importorskip("pyarrow.cffi") + ffi = cffi.ffi + + c_schema = ffi.new("struct ArrowSchema*") + ptr_schema = int(ffi.cast("uintptr_t", c_schema)) + c_array = ffi.new("struct ArrowDeviceArray*") + ptr_array = int(ffi.cast("uintptr_t", c_array)) + + typ = pa.list_(pa.int32()) + arr = pa.array([[1], [2, 42]], type=typ) + + # TODO replace below with copy to device when exposed in python + cbuffers = [] + for buf in arr.buffers(): + if buf is None: + cbuffers.append(None) + else: + cbuf = global_context.new_buffer(buf.size) + cbuf.copy_from_host(buf, position=0, nbytes=buf.size) + cbuffers.append(cbuf) + + carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[ + pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:]) + ]) + + # Type is known up front + carr._export_to_c_device(ptr_array) + + # verify exported struct + assert c_array.device_type == 2 # ARROW_DEVICE_CUDA 2 + assert c_array.device_id == global_context.device_number + assert c_array.array.length == 2 + + # Delete recreate C++ object from exported pointer + del carr + carr_new = pa.Array._import_from_c_device(ptr_array, typ) + assert carr_new.type == pa.list_(pa.int32()) + arr_new = _arr_copy_to_host(carr_new) + assert arr_new.equals(arr) + + del carr_new + # Now released + with pytest.raises(ValueError, match="Cannot import released ArrowArray"): + pa.Array._import_from_c_device(ptr_array, typ) + + # Schema is exported and imported at the same time + carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[ + pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:]) + ]) + carr._export_to_c_device(ptr_array, ptr_schema) + # Delete and recreate C++ objects from exported pointers + del carr + carr_new = pa.Array._import_from_c_device(ptr_array, ptr_schema) + assert carr_new.type == pa.list_(pa.int32()) + arr_new = _arr_copy_to_host(carr_new) + assert arr_new.equals(arr) + + del carr_new + # Now released + with pytest.raises(ValueError, match="Cannot import released ArrowSchema"): + pa.Array._import_from_c_device(ptr_array, ptr_schema) + + +def _batch_copy_to_host(cbatch): + # TODO replace below with copy to device when exposed in python + arrs = [] + for col in cbatch.columns: + buffers = [ + global_context.foreign_buffer(buf.address, buf.size, buf).copy_to_host() + if buf is not None else None + for buf in col.buffers() + ] + new = pa.Array.from_buffers(col.type, len(col), buffers) + arrs.append(new) + + return pa.RecordBatch.from_arrays(arrs, schema=cbatch.schema) + + +def test_device_interface_batch_array(): + cffi = pytest.importorskip("pyarrow.cffi") + ffi = cffi.ffi + + c_schema = ffi.new("struct ArrowSchema*") + ptr_schema = int(ffi.cast("uintptr_t", c_schema)) + c_array = ffi.new("struct ArrowDeviceArray*") + ptr_array = int(ffi.cast("uintptr_t", c_array)) + + batch = make_recordbatch(10) + schema = batch.schema + hbuf = batch.serialize() + cbuf = cuda.serialize_record_batch(batch, global_context) + cbatch = cuda.read_record_batch(cbuf, schema) + + # Schema is known up front + cbatch._export_to_c_device(ptr_array) + + # verify exported struct + assert c_array.device_type == 2 # ARROW_DEVICE_CUDA 2 + assert c_array.device_id == global_context.device_number + assert c_array.array.length == 10 + + # Delete recreate C++ object from exported pointer + del cbatch + cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema) + assert cbatch_new.schema == schema + batch_new = _batch_copy_to_host(cbatch_new) + assert batch_new.equals(batch) + + del cbatch_new + # Now released + with pytest.raises(ValueError, match="Cannot import released ArrowArray"): + pa.RecordBatch._import_from_c_device(ptr_array, schema) + + # Schema is exported and imported at the same time + cbatch = cuda.read_record_batch(cbuf, schema) + cbatch._export_to_c_device(ptr_array, ptr_schema) + # Delete and recreate C++ objects from exported pointers + del cbatch + cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) + assert cbatch_new.schema == schema + batch_new = _batch_copy_to_host(cbatch_new) + assert batch_new.equals(batch) + + del cbatch_new + # Now released + with pytest.raises(ValueError, match="Cannot import released ArrowSchema"): + pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) + + # Not a struct type + pa.int32()._export_to_c(ptr_schema) + with pytest.raises(ValueError, + match="ArrowSchema describes non-struct type"): + pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) From 143f511082611fd2ea403b0c597ecc5e052c183c Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 6 Mar 2024 16:04:33 +0100 Subject: [PATCH 02/12] test on cpu --- python/pyarrow/array.pxi | 4 ++-- python/pyarrow/table.pxi | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index b4ac4b4f1c2..24582a9796d 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -21,8 +21,8 @@ import os import warnings from cython import sizeof -from pyarrow.includes.libarrow_cuda cimport DefaultMemoryMapper -# from pyarrow.includes.libarrow_memory cimport DeviceMapper +# from pyarrow.includes.libarrow_cuda cimport DefaultMemoryMapper +from pyarrow.includes.libarrow cimport DefaultDeviceMapper as DefaultMemoryMapper cdef _sequence_to_array(object sequence, object mask, object size, diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 4062b99500f..d541e635d8b 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -20,8 +20,8 @@ from cpython.pycapsule cimport PyCapsule_CheckExact, PyCapsule_GetPointer, PyCap import warnings from cython import sizeof -from pyarrow.includes.libarrow_cuda cimport DefaultMemoryMapper -# from pyarrow.includes.libarrow_memory cimport DeviceMapper +# from pyarrow.includes.libarrow_cuda cimport DefaultMemoryMapper +from pyarrow.includes.libarrow cimport DefaultDeviceMapper as DefaultMemoryMapper cdef class ChunkedArray(_PandasConvertible): From 42e883c70a94fd67565ddd3c0591344e6cc420ad Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 19 Mar 2024 14:08:13 +0100 Subject: [PATCH 03/12] try with #define --- python/pyarrow/array.pxi | 7 ++++--- python/pyarrow/includes/libarrow_memory.pxd | 8 ++++---- python/pyarrow/table.pxi | 7 ++++--- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 24582a9796d..8b68313f523 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -22,7 +22,8 @@ import warnings from cython import sizeof # from pyarrow.includes.libarrow_cuda cimport DefaultMemoryMapper -from pyarrow.includes.libarrow cimport DefaultDeviceMapper as DefaultMemoryMapper +# from pyarrow.includes.libarrow cimport DefaultDeviceMapper as DefaultDeviceMemoryMapper +from pyarrow.includes.libarrow_memory cimport CDefaultDeviceMemoryMapper cdef _sequence_to_array(object sequence, object mask, object size, @@ -1838,13 +1839,13 @@ cdef class Array(_PandasConvertible): c_array = GetResultValue( ImportDeviceArray( c_ptr, c_type_ptr, - DefaultMemoryMapper) + CDefaultDeviceMemoryMapper) ) else: with nogil: c_array = GetResultValue( ImportDeviceArray( c_ptr, c_type, - DefaultMemoryMapper) + CDefaultDeviceMemoryMapper) ) return pyarrow_wrap_array(c_array) diff --git a/python/pyarrow/includes/libarrow_memory.pxd b/python/pyarrow/includes/libarrow_memory.pxd index 463beb6f0b6..9c78433f89d 100644 --- a/python/pyarrow/includes/libarrow_memory.pxd +++ b/python/pyarrow/includes/libarrow_memory.pxd @@ -24,11 +24,11 @@ cdef extern from *: """ #ifdef ARROW_CUDA #include "arrow/gpu/cuda_api.h" - using DeviceMapper = arrow::cuda::DefaultMemoryMapper; + #define CDefaultDeviceMemoryMapper(device_type, device_id) arrow::cuda::DefaultMemoryMapper(device_typem device_id) #else #include "arrow/c/bridge.h" - using DeviceMapper = arrow::DefaultDeviceMapper; + #define CDefaultDeviceMemoryMapper(device_type, device_id) arrow::DefaultDeviceMapper(device_type, device_id) #endif """ - CResult[shared_ptr[CMemoryManager]] DeviceMapper( - ArrowDeviceType device_type, int64_t device_id) + CResult[shared_ptr[CMemoryManager]] CDefaultDeviceMemoryMapper" CDefaultDeviceMemoryMapper"( + ArrowDeviceType device_type, int64_t device_id) nogil diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index d541e635d8b..20baa385a09 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -21,7 +21,8 @@ import warnings from cython import sizeof # from pyarrow.includes.libarrow_cuda cimport DefaultMemoryMapper -from pyarrow.includes.libarrow cimport DefaultDeviceMapper as DefaultMemoryMapper +# from pyarrow.includes.libarrow cimport DefaultDeviceMapper as DefaultDeviceMemoryMapper +from pyarrow.includes.libarrow_memory cimport CDefaultDeviceMemoryMapper cdef class ChunkedArray(_PandasConvertible): @@ -3601,11 +3602,11 @@ cdef class RecordBatch(_Tabular): with nogil: c_batch = GetResultValue(ImportDeviceRecordBatch( c_ptr, c_schema_ptr, - DefaultMemoryMapper)) + CDefaultDeviceMemoryMapper)) else: with nogil: c_batch = GetResultValue(ImportDeviceRecordBatch( - c_ptr, c_schema, DefaultMemoryMapper)) + c_ptr, c_schema, CDefaultDeviceMemoryMapper)) return pyarrow_wrap_batch(c_batch) From 05dd7a5d3796636bc263826388653d2ebc57c804 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 19 Mar 2024 14:31:57 +0100 Subject: [PATCH 04/12] try with helper function duplicated in _cuda.pyx --- python/CMakeLists.txt | 1 - python/pyarrow/_cuda.pyx | 46 ++++++++++++++ python/pyarrow/array.pxi | 70 +++++++++++++-------- python/pyarrow/includes/libarrow_cuda.pxd | 2 +- python/pyarrow/includes/libarrow_memory.pxd | 34 ---------- python/pyarrow/lib.pxd | 2 + python/pyarrow/table.pxi | 48 +++++++------- 7 files changed, 121 insertions(+), 82 deletions(-) delete mode 100644 python/pyarrow/includes/libarrow_memory.pxd diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 6fece1ab9bc..af65ea7d614 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -796,7 +796,6 @@ install(FILES "${ARROW_PYTHON_BINARY_DIR}/lib_api.h" "${ARROW_PYTHON_BINARY_DIR} if(PYARROW_BUILD_CUDA) target_link_libraries(_cuda PRIVATE ${CUDA_LINK_LIBS}) - target_link_libraries(lib PRIVATE ${CUDA_LINK_LIBS}) endif() if(PYARROW_BUILD_FLIGHT) diff --git a/python/pyarrow/_cuda.pyx b/python/pyarrow/_cuda.pyx index ba799a105e7..e0b56a649ed 100644 --- a/python/pyarrow/_cuda.pyx +++ b/python/pyarrow/_cuda.pyx @@ -965,6 +965,52 @@ def read_record_batch(object buffer, object schema, *, return pyarrow_wrap_batch(batch) +def _import_device_array(in_ptr, type): + cdef: + void* c_ptr = _as_c_pointer(in_ptr) + void* c_type_ptr + shared_ptr[CArray] c_array + + c_type = pyarrow_unwrap_data_type(type) + if c_type == nullptr: + # Not a DataType object, perhaps a raw ArrowSchema pointer + c_type_ptr = _as_c_pointer(type) + with nogil: + c_array = GetResultValue( + ImportDeviceArray( c_ptr, + c_type_ptr, + CudaDefaultMemoryMapper) + ) + else: + with nogil: + c_array = GetResultValue( + ImportDeviceArray( c_ptr, c_type, + CudaDefaultMemoryMapper) + ) + return pyarrow_wrap_array(c_array) + + +def _import_device_recordbatch(in_ptr, schema): + cdef: + void* c_ptr = _as_c_pointer(in_ptr) + void* c_schema_ptr + shared_ptr[CRecordBatch] c_batch + + c_schema = pyarrow_unwrap_schema(schema) + if c_schema == nullptr: + # Not a Schema object, perhaps a raw ArrowSchema pointer + c_schema_ptr = _as_c_pointer(schema, allow_null=True) + with nogil: + c_batch = GetResultValue(ImportDeviceRecordBatch( + c_ptr, c_schema_ptr, + CudaDefaultMemoryMapper)) + else: + with nogil: + c_batch = GetResultValue(ImportDeviceRecordBatch( + c_ptr, c_schema, CudaDefaultMemoryMapper)) + return pyarrow_wrap_batch(c_batch) + + # Public API diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 8b68313f523..34022e2b81f 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -21,9 +21,50 @@ import os import warnings from cython import sizeof -# from pyarrow.includes.libarrow_cuda cimport DefaultMemoryMapper -# from pyarrow.includes.libarrow cimport DefaultDeviceMapper as DefaultDeviceMemoryMapper -from pyarrow.includes.libarrow_memory cimport CDefaultDeviceMemoryMapper + +def _import_device_array(in_ptr, type): + """ + Import Array from a C ArrowDeviceArray struct, given its pointer + and the imported array type. + + Parameters + ---------- + in_ptr: int + The raw pointer to a C ArrowDeviceArray struct. + type: DataType or int + Either a DataType object, or the raw pointer to a C ArrowSchema + struct. + + This is a low-level function intended for expert users. + """ + cdef: + void* c_ptr = _as_c_pointer(in_ptr) + void* c_type_ptr + shared_ptr[CArray] c_array + + c_type = pyarrow_unwrap_data_type(type) + if c_type == nullptr: + # Not a DataType object, perhaps a raw ArrowSchema pointer + c_type_ptr = _as_c_pointer(type) + with nogil: + c_array = GetResultValue( + ImportDeviceArray( c_ptr, + c_type_ptr, + DefaultDeviceMapper) + ) + else: + with nogil: + c_array = GetResultValue( + ImportDeviceArray( c_ptr, c_type, + DefaultDeviceMapper) + ) + return pyarrow_wrap_array(c_array) + + +try: + from pyarrow._cuda import _import_device_array +except ImportError: + pass cdef _sequence_to_array(object sequence, object mask, object size, @@ -1826,28 +1867,7 @@ cdef class Array(_PandasConvertible): This is a low-level function intended for expert users. """ - cdef: - void* c_ptr = _as_c_pointer(in_ptr) - void* c_type_ptr - shared_ptr[CArray] c_array - - c_type = pyarrow_unwrap_data_type(type) - if c_type == nullptr: - # Not a DataType object, perhaps a raw ArrowSchema pointer - c_type_ptr = _as_c_pointer(type) - with nogil: - c_array = GetResultValue( - ImportDeviceArray( c_ptr, - c_type_ptr, - CDefaultDeviceMemoryMapper) - ) - else: - with nogil: - c_array = GetResultValue( - ImportDeviceArray( c_ptr, c_type, - CDefaultDeviceMemoryMapper) - ) - return pyarrow_wrap_array(c_array) + return _import_device_array(in_ptr, type) def __dlpack__(self, stream=None): """Export a primitive array as a DLPack capsule. diff --git a/python/pyarrow/includes/libarrow_cuda.pxd b/python/pyarrow/includes/libarrow_cuda.pxd index ac9c9f6a0e7..c7286e90122 100644 --- a/python/pyarrow/includes/libarrow_cuda.pxd +++ b/python/pyarrow/includes/libarrow_cuda.pxd @@ -93,7 +93,7 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::cuda" nogil: CResult[shared_ptr[CCudaHostBuffer]] AllocateCudaHostBuffer( int device_number, const int64_t size) - CResult[shared_ptr[CMemoryManager]] DefaultMemoryMapper( + CResult[shared_ptr[CMemoryManager]] CudaDefaultMemoryMapper" DefaultMemoryMapper"( ArrowDeviceType device_type, int64_t device_id) # Cuda prefix is added to avoid picking up arrow::cuda functions diff --git a/python/pyarrow/includes/libarrow_memory.pxd b/python/pyarrow/includes/libarrow_memory.pxd deleted file mode 100644 index 9c78433f89d..00000000000 --- a/python/pyarrow/includes/libarrow_memory.pxd +++ /dev/null @@ -1,34 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# distutils: language = c++ - -from pyarrow.includes.libarrow cimport * - - -cdef extern from *: - """ - #ifdef ARROW_CUDA - #include "arrow/gpu/cuda_api.h" - #define CDefaultDeviceMemoryMapper(device_type, device_id) arrow::cuda::DefaultMemoryMapper(device_typem device_id) - #else - #include "arrow/c/bridge.h" - #define CDefaultDeviceMemoryMapper(device_type, device_id) arrow::DefaultDeviceMapper(device_type, device_id) - #endif - """ - CResult[shared_ptr[CMemoryManager]] CDefaultDeviceMemoryMapper" CDefaultDeviceMemoryMapper"( - ArrowDeviceType device_type, int64_t device_id) nogil diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index b1187a77c2a..1342c38dec0 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -643,6 +643,8 @@ cdef shared_ptr[const CKeyValueMetadata] pyarrow_unwrap_metadata( cdef object pyarrow_wrap_metadata( const shared_ptr[const CKeyValueMetadata]& meta) +cdef void* _as_c_pointer(v, allow_null=*) except * + # # Public Cython API for 3rd party code # diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 20baa385a09..a954fb6effa 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -20,9 +20,32 @@ from cpython.pycapsule cimport PyCapsule_CheckExact, PyCapsule_GetPointer, PyCap import warnings from cython import sizeof -# from pyarrow.includes.libarrow_cuda cimport DefaultMemoryMapper -# from pyarrow.includes.libarrow cimport DefaultDeviceMapper as DefaultDeviceMemoryMapper -from pyarrow.includes.libarrow_memory cimport CDefaultDeviceMemoryMapper + +def _import_device_recordbatch(in_ptr, schema): + cdef: + void* c_ptr = _as_c_pointer(in_ptr) + void* c_schema_ptr + shared_ptr[CRecordBatch] c_batch + + c_schema = pyarrow_unwrap_schema(schema) + if c_schema == nullptr: + # Not a Schema object, perhaps a raw ArrowSchema pointer + c_schema_ptr = _as_c_pointer(schema, allow_null=True) + with nogil: + c_batch = GetResultValue(ImportDeviceRecordBatch( + c_ptr, c_schema_ptr, + DefaultDeviceMapper)) + else: + with nogil: + c_batch = GetResultValue(ImportDeviceRecordBatch( + c_ptr, c_schema, DefaultDeviceMapper)) + return pyarrow_wrap_batch(c_batch) + + +try: + from pyarrow._cuda import _import_device_recordbatch +except ImportError: + pass cdef class ChunkedArray(_PandasConvertible): @@ -3590,24 +3613,7 @@ cdef class RecordBatch(_Tabular): This is a low-level function intended for expert users. """ - cdef: - void* c_ptr = _as_c_pointer(in_ptr) - void* c_schema_ptr - shared_ptr[CRecordBatch] c_batch - - c_schema = pyarrow_unwrap_schema(schema) - if c_schema == nullptr: - # Not a Schema object, perhaps a raw ArrowSchema pointer - c_schema_ptr = _as_c_pointer(schema, allow_null=True) - with nogil: - c_batch = GetResultValue(ImportDeviceRecordBatch( - c_ptr, c_schema_ptr, - CDefaultDeviceMemoryMapper)) - else: - with nogil: - c_batch = GetResultValue(ImportDeviceRecordBatch( - c_ptr, c_schema, CDefaultDeviceMemoryMapper)) - return pyarrow_wrap_batch(c_batch) + return _import_device_recordbatch(in_ptr, schema) def _reconstruct_record_batch(columns, schema): From 83b0d587f62f5def2e213d094daee4e8545bbab0 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 19 Mar 2024 15:24:30 +0000 Subject: [PATCH 05/12] fixup import from pyarrow._cuda --- python/pyarrow/_cuda.pyx | 18 +++++++++++------- python/pyarrow/array.pxi | 22 +++++----------------- python/pyarrow/includes/libarrow_cuda.pxd | 5 +++-- python/pyarrow/lib.pyx | 8 +++++--- python/pyarrow/table.pxi | 8 +++++--- 5 files changed, 29 insertions(+), 32 deletions(-) diff --git a/python/pyarrow/_cuda.pyx b/python/pyarrow/_cuda.pyx index e0b56a649ed..24c232ddfb4 100644 --- a/python/pyarrow/_cuda.pyx +++ b/python/pyarrow/_cuda.pyx @@ -965,7 +965,8 @@ def read_record_batch(object buffer, object schema, *, return pyarrow_wrap_batch(batch) -def _import_device_array(in_ptr, type): +def _import_device_array_cuda(in_ptr, type): + # equivalent to the definition in array.pxi but using CudaDefaultMemoryMapper cdef: void* c_ptr = _as_c_pointer(in_ptr) void* c_type_ptr @@ -978,19 +979,20 @@ def _import_device_array(in_ptr, type): with nogil: c_array = GetResultValue( ImportDeviceArray( c_ptr, - c_type_ptr, - CudaDefaultMemoryMapper) + c_type_ptr, + CudaDefaultMemoryMapper) ) else: with nogil: c_array = GetResultValue( ImportDeviceArray( c_ptr, c_type, - CudaDefaultMemoryMapper) + CudaDefaultMemoryMapper) ) return pyarrow_wrap_array(c_array) -def _import_device_recordbatch(in_ptr, schema): +def _import_device_recordbatch_cuda(in_ptr, schema): + # equivalent to the definition in table.pxi but using CudaDefaultMemoryMapper cdef: void* c_ptr = _as_c_pointer(in_ptr) void* c_schema_ptr @@ -1003,11 +1005,13 @@ def _import_device_recordbatch(in_ptr, schema): with nogil: c_batch = GetResultValue(ImportDeviceRecordBatch( c_ptr, c_schema_ptr, - CudaDefaultMemoryMapper)) + CudaDefaultMemoryMapper) + ) else: with nogil: c_batch = GetResultValue(ImportDeviceRecordBatch( - c_ptr, c_schema, CudaDefaultMemoryMapper)) + c_ptr, c_schema, CudaDefaultMemoryMapper) + ) return pyarrow_wrap_batch(c_batch) diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 34022e2b81f..a8dfbaeab16 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -22,21 +22,7 @@ import warnings from cython import sizeof -def _import_device_array(in_ptr, type): - """ - Import Array from a C ArrowDeviceArray struct, given its pointer - and the imported array type. - - Parameters - ---------- - in_ptr: int - The raw pointer to a C ArrowDeviceArray struct. - type: DataType or int - Either a DataType object, or the raw pointer to a C ArrowSchema - struct. - - This is a low-level function intended for expert users. - """ +def _import_device_array_cpu(in_ptr, type): cdef: void* c_ptr = _as_c_pointer(in_ptr) void* c_type_ptr @@ -62,9 +48,11 @@ def _import_device_array(in_ptr, type): try: - from pyarrow._cuda import _import_device_array + from pyarrow._cuda import _import_device_array_cuda + + _import_device_array = _import_device_array_cuda except ImportError: - pass + _import_device_array = _import_device_array_cpu cdef _sequence_to_array(object sequence, object mask, object size, diff --git a/python/pyarrow/includes/libarrow_cuda.pxd b/python/pyarrow/includes/libarrow_cuda.pxd index c7286e90122..bb15961566a 100644 --- a/python/pyarrow/includes/libarrow_cuda.pxd +++ b/python/pyarrow/includes/libarrow_cuda.pxd @@ -93,8 +93,9 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::cuda" nogil: CResult[shared_ptr[CCudaHostBuffer]] AllocateCudaHostBuffer( int device_number, const int64_t size) - CResult[shared_ptr[CMemoryManager]] CudaDefaultMemoryMapper" DefaultMemoryMapper"( - ArrowDeviceType device_type, int64_t device_id) + CResult[shared_ptr[CMemoryManager]] \ + CudaDefaultMemoryMapper" arrow::cuda::DefaultMemoryMapper"( + ArrowDeviceType device_type, int64_t device_id) # Cuda prefix is added to avoid picking up arrow::cuda functions # from arrow namespace. diff --git a/python/pyarrow/lib.pyx b/python/pyarrow/lib.pyx index 3245e50f0fe..9ebc7336a1b 100644 --- a/python/pyarrow/lib.pyx +++ b/python/pyarrow/lib.pyx @@ -26,6 +26,8 @@ import os import sys from cython.operator cimport dereference as deref +from cython cimport binding + from pyarrow.includes.libarrow cimport * from pyarrow.includes.libarrow_python cimport * from pyarrow.includes.common cimport PyObject_to_object @@ -162,6 +164,9 @@ include "pandas-shim.pxi" # Memory pools and allocation include "memory.pxi" +# File IO +include "io.pxi" + # DataType, Field, Schema include "types.pxi" @@ -183,9 +188,6 @@ include "tensor.pxi" # DLPack include "_dlpack.pxi" -# File IO -include "io.pxi" - # IPC / Messaging include "ipc.pxi" diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index a954fb6effa..f8b3fe9274b 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -21,7 +21,7 @@ import warnings from cython import sizeof -def _import_device_recordbatch(in_ptr, schema): +def _import_device_recordbatch_cpu(in_ptr, schema): cdef: void* c_ptr = _as_c_pointer(in_ptr) void* c_schema_ptr @@ -43,9 +43,11 @@ def _import_device_recordbatch(in_ptr, schema): try: - from pyarrow._cuda import _import_device_recordbatch + from pyarrow._cuda import _import_device_recordbatch_cuda + + _import_device_recordbatch = _import_device_recordbatch_cuda except ImportError: - pass + _import_device_recordbatch = _import_device_recordbatch_cpu cdef class ChunkedArray(_PandasConvertible): From fd08ac16f7cf2857130aff1d3d8187f1f85ac1c5 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 19 Mar 2024 16:32:11 +0100 Subject: [PATCH 06/12] fix linting issues in test_cuda.py --- python/pyarrow/tests/test_cuda.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyarrow/tests/test_cuda.py b/python/pyarrow/tests/test_cuda.py index bf52b77d179..400db2643bd 100644 --- a/python/pyarrow/tests/test_cuda.py +++ b/python/pyarrow/tests/test_cuda.py @@ -805,7 +805,7 @@ def _arr_copy_to_host(carr): cbuf.address, cbuf.size, cbuf ).copy_to_host() buffers.append(buf) - + child = pa.Array.from_buffers(carr.type.value_type, 3, buffers[2:]) new = pa.Array.from_buffers(carr.type, 2, buffers[:2], children=[child]) return new @@ -901,7 +901,6 @@ def test_device_interface_batch_array(): batch = make_recordbatch(10) schema = batch.schema - hbuf = batch.serialize() cbuf = cuda.serialize_record_batch(batch, global_context) cbatch = cuda.read_record_batch(cbuf, schema) From 1bfc05c247952ec4b32ab0a3bb1ff235ca1ab1da Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 27 Mar 2024 12:52:50 +0100 Subject: [PATCH 07/12] undo non-test changes + try import pyarrow.cuda instead to register --- python/pyarrow/__init__.py | 7 +++ python/pyarrow/_cuda.pyx | 50 --------------------- python/pyarrow/array.pxi | 54 +++++++++-------------- python/pyarrow/includes/libarrow.pxd | 20 +++------ python/pyarrow/includes/libarrow_cuda.pxd | 4 -- python/pyarrow/lib.pxd | 2 - python/pyarrow/lib.pyx | 8 ++-- python/pyarrow/table.pxi | 48 +++++++------------- 8 files changed, 52 insertions(+), 141 deletions(-) diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 936f4736977..5cab1d3cc28 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -281,6 +281,13 @@ def print_entry(label, value): import pyarrow.types as types +try: + # Try importing the cuda module to ensure libarrow_cuda gets loaded + # to register the CUDA device for the C Data Interface import + import pyarrow.cuda +except ImportError: + pass + # ---------------------------------------------------------------------- # Deprecations diff --git a/python/pyarrow/_cuda.pyx b/python/pyarrow/_cuda.pyx index 24c232ddfb4..ba799a105e7 100644 --- a/python/pyarrow/_cuda.pyx +++ b/python/pyarrow/_cuda.pyx @@ -965,56 +965,6 @@ def read_record_batch(object buffer, object schema, *, return pyarrow_wrap_batch(batch) -def _import_device_array_cuda(in_ptr, type): - # equivalent to the definition in array.pxi but using CudaDefaultMemoryMapper - cdef: - void* c_ptr = _as_c_pointer(in_ptr) - void* c_type_ptr - shared_ptr[CArray] c_array - - c_type = pyarrow_unwrap_data_type(type) - if c_type == nullptr: - # Not a DataType object, perhaps a raw ArrowSchema pointer - c_type_ptr = _as_c_pointer(type) - with nogil: - c_array = GetResultValue( - ImportDeviceArray( c_ptr, - c_type_ptr, - CudaDefaultMemoryMapper) - ) - else: - with nogil: - c_array = GetResultValue( - ImportDeviceArray( c_ptr, c_type, - CudaDefaultMemoryMapper) - ) - return pyarrow_wrap_array(c_array) - - -def _import_device_recordbatch_cuda(in_ptr, schema): - # equivalent to the definition in table.pxi but using CudaDefaultMemoryMapper - cdef: - void* c_ptr = _as_c_pointer(in_ptr) - void* c_schema_ptr - shared_ptr[CRecordBatch] c_batch - - c_schema = pyarrow_unwrap_schema(schema) - if c_schema == nullptr: - # Not a Schema object, perhaps a raw ArrowSchema pointer - c_schema_ptr = _as_c_pointer(schema, allow_null=True) - with nogil: - c_batch = GetResultValue(ImportDeviceRecordBatch( - c_ptr, c_schema_ptr, - CudaDefaultMemoryMapper) - ) - else: - with nogil: - c_batch = GetResultValue(ImportDeviceRecordBatch( - c_ptr, c_schema, CudaDefaultMemoryMapper) - ) - return pyarrow_wrap_batch(c_batch) - - # Public API diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 163d1ffb0ee..59d2e91ef6c 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -22,39 +22,6 @@ import warnings from cython import sizeof -def _import_device_array_cpu(in_ptr, type): - cdef: - void* c_ptr = _as_c_pointer(in_ptr) - void* c_type_ptr - shared_ptr[CArray] c_array - - c_type = pyarrow_unwrap_data_type(type) - if c_type == nullptr: - # Not a DataType object, perhaps a raw ArrowSchema pointer - c_type_ptr = _as_c_pointer(type) - with nogil: - c_array = GetResultValue( - ImportDeviceArray( c_ptr, - c_type_ptr, - DefaultDeviceMapper) - ) - else: - with nogil: - c_array = GetResultValue( - ImportDeviceArray( c_ptr, c_type, - DefaultDeviceMapper) - ) - return pyarrow_wrap_array(c_array) - - -try: - from pyarrow._cuda import _import_device_array_cuda - - _import_device_array = _import_device_array_cuda -except ImportError: - _import_device_array = _import_device_array_cpu - - cdef _sequence_to_array(object sequence, object mask, object size, DataType type, CMemoryPool* pool, c_bool from_pandas): cdef: @@ -1867,7 +1834,26 @@ cdef class Array(_PandasConvertible): This is a low-level function intended for expert users. """ - return _import_device_array(in_ptr, type) + cdef: + void* c_ptr = _as_c_pointer(in_ptr) + void* c_type_ptr + shared_ptr[CArray] c_array + + c_type = pyarrow_unwrap_data_type(type) + if c_type == nullptr: + # Not a DataType object, perhaps a raw ArrowSchema pointer + c_type_ptr = _as_c_pointer(type) + with nogil: + c_array = GetResultValue( + ImportDeviceArray( c_ptr, + c_type_ptr) + ) + else: + with nogil: + c_array = GetResultValue( + ImportDeviceArray( c_ptr, c_type) + ) + return pyarrow_wrap_array(c_array) def __dlpack__(self, stream=None): """Export a primitive array as a DLPack capsule. diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 1d28f944818..9e5e3d3fa68 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -346,9 +346,6 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CResult[unique_ptr[CResizableBuffer]] AllocateResizableBuffer( const int64_t size, CMemoryPool* pool) - cdef cppclass CMemoryManager" arrow::MemoryManager": - pass - cdef cppclass CSyncEvent" arrow::Device::SyncEvent": pass @@ -2919,8 +2916,6 @@ cdef extern from "arrow/c/abi.h": cdef struct ArrowArrayStream: void (*release)(ArrowArrayStream*) noexcept nogil - ctypedef int32_t ArrowDeviceType - cdef struct ArrowDeviceArray: pass @@ -2956,25 +2951,20 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil: CStatus ExportChunkedArray(shared_ptr[CChunkedArray], ArrowArrayStream*) CResult[shared_ptr[CChunkedArray]] ImportChunkedArray(ArrowArrayStream*) - ctypedef CResult[shared_ptr[CMemoryManager]] CDeviceMemoryMapper( - ArrowDeviceType, int64_t) - - CResult[shared_ptr[CMemoryManager]] DefaultDeviceMapper( - ArrowDeviceType device_type, int64_t device_id) - CStatus ExportDeviceArray(const CArray&, shared_ptr[CSyncEvent], ArrowDeviceArray* out, ArrowSchema*) CResult[shared_ptr[CArray]] ImportDeviceArray( - ArrowDeviceArray*, shared_ptr[CDataType], const CDeviceMemoryMapper&) + ArrowDeviceArray*, shared_ptr[CDataType]) CResult[shared_ptr[CArray]] ImportDeviceArray( - ArrowDeviceArray*, ArrowSchema*, const CDeviceMemoryMapper&) + ArrowDeviceArray*, ArrowSchema*) CStatus ExportDeviceRecordBatch(const CRecordBatch&, shared_ptr[CSyncEvent], ArrowDeviceArray* out, ArrowSchema*) CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch( - ArrowDeviceArray*, shared_ptr[CSchema], const CDeviceMemoryMapper&) + ArrowDeviceArray*, shared_ptr[CSchema]) CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch( - ArrowDeviceArray*, ArrowSchema*, const CDeviceMemoryMapper&) + ArrowDeviceArray*, ArrowSchema*) + cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil: CResult[int64_t] ReferencedBufferSize(const CArray& array_data) diff --git a/python/pyarrow/includes/libarrow_cuda.pxd b/python/pyarrow/includes/libarrow_cuda.pxd index bb15961566a..3ac943cf941 100644 --- a/python/pyarrow/includes/libarrow_cuda.pxd +++ b/python/pyarrow/includes/libarrow_cuda.pxd @@ -93,10 +93,6 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::cuda" nogil: CResult[shared_ptr[CCudaHostBuffer]] AllocateCudaHostBuffer( int device_number, const int64_t size) - CResult[shared_ptr[CMemoryManager]] \ - CudaDefaultMemoryMapper" arrow::cuda::DefaultMemoryMapper"( - ArrowDeviceType device_type, int64_t device_id) - # Cuda prefix is added to avoid picking up arrow::cuda functions # from arrow namespace. CResult[shared_ptr[CCudaBuffer]] \ diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 1342c38dec0..b1187a77c2a 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -643,8 +643,6 @@ cdef shared_ptr[const CKeyValueMetadata] pyarrow_unwrap_metadata( cdef object pyarrow_wrap_metadata( const shared_ptr[const CKeyValueMetadata]& meta) -cdef void* _as_c_pointer(v, allow_null=*) except * - # # Public Cython API for 3rd party code # diff --git a/python/pyarrow/lib.pyx b/python/pyarrow/lib.pyx index 9ebc7336a1b..3245e50f0fe 100644 --- a/python/pyarrow/lib.pyx +++ b/python/pyarrow/lib.pyx @@ -26,8 +26,6 @@ import os import sys from cython.operator cimport dereference as deref -from cython cimport binding - from pyarrow.includes.libarrow cimport * from pyarrow.includes.libarrow_python cimport * from pyarrow.includes.common cimport PyObject_to_object @@ -164,9 +162,6 @@ include "pandas-shim.pxi" # Memory pools and allocation include "memory.pxi" -# File IO -include "io.pxi" - # DataType, Field, Schema include "types.pxi" @@ -188,6 +183,9 @@ include "tensor.pxi" # DLPack include "_dlpack.pxi" +# File IO +include "io.pxi" + # IPC / Messaging include "ipc.pxi" diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 8b71249a52c..1ab3fd04ed9 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -20,36 +20,6 @@ from cpython.pycapsule cimport PyCapsule_CheckExact, PyCapsule_GetPointer, PyCap import warnings from cython import sizeof - -def _import_device_recordbatch_cpu(in_ptr, schema): - cdef: - void* c_ptr = _as_c_pointer(in_ptr) - void* c_schema_ptr - shared_ptr[CRecordBatch] c_batch - - c_schema = pyarrow_unwrap_schema(schema) - if c_schema == nullptr: - # Not a Schema object, perhaps a raw ArrowSchema pointer - c_schema_ptr = _as_c_pointer(schema, allow_null=True) - with nogil: - c_batch = GetResultValue(ImportDeviceRecordBatch( - c_ptr, c_schema_ptr, - DefaultDeviceMapper)) - else: - with nogil: - c_batch = GetResultValue(ImportDeviceRecordBatch( - c_ptr, c_schema, DefaultDeviceMapper)) - return pyarrow_wrap_batch(c_batch) - - -try: - from pyarrow._cuda import _import_device_recordbatch_cuda - - _import_device_recordbatch = _import_device_recordbatch_cuda -except ImportError: - _import_device_recordbatch = _import_device_recordbatch_cpu - - cdef class ChunkedArray(_PandasConvertible): """ An array-like composed from a (possibly empty) collection of pyarrow.Arrays @@ -3638,7 +3608,23 @@ cdef class RecordBatch(_Tabular): This is a low-level function intended for expert users. """ - return _import_device_recordbatch(in_ptr, schema) + cdef: + void* c_ptr = _as_c_pointer(in_ptr) + void* c_schema_ptr + shared_ptr[CRecordBatch] c_batch + + c_schema = pyarrow_unwrap_schema(schema) + if c_schema == nullptr: + # Not a Schema object, perhaps a raw ArrowSchema pointer + c_schema_ptr = _as_c_pointer(schema, allow_null=True) + with nogil: + c_batch = GetResultValue(ImportDeviceRecordBatch( + c_ptr, c_schema_ptr)) + else: + with nogil: + c_batch = GetResultValue(ImportDeviceRecordBatch( + c_ptr, c_schema)) + return pyarrow_wrap_batch(c_batch) def _reconstruct_record_batch(columns, schema): From c35df809ce9b961038f7ab3a5fbf14e866f427bf Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 27 Mar 2024 13:20:08 +0000 Subject: [PATCH 08/12] import on demand --- python/pyarrow/__init__.py | 7 ------- python/pyarrow/array.pxi | 2 ++ python/pyarrow/lib.pyx | 13 +++++++++++++ python/pyarrow/table.pxi | 2 ++ 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 5cab1d3cc28..936f4736977 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -281,13 +281,6 @@ def print_entry(label, value): import pyarrow.types as types -try: - # Try importing the cuda module to ensure libarrow_cuda gets loaded - # to register the CUDA device for the C Data Interface import - import pyarrow.cuda -except ImportError: - pass - # ---------------------------------------------------------------------- # Deprecations diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 59d2e91ef6c..04973401007 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1839,6 +1839,8 @@ cdef class Array(_PandasConvertible): void* c_type_ptr shared_ptr[CArray] c_array + _ensure_cuda_loaded() + c_type = pyarrow_unwrap_data_type(type) if c_type == nullptr: # Not a DataType object, perhaps a raw ArrowSchema pointer diff --git a/python/pyarrow/lib.pyx b/python/pyarrow/lib.pyx index 3245e50f0fe..96e0484780a 100644 --- a/python/pyarrow/lib.pyx +++ b/python/pyarrow/lib.pyx @@ -125,6 +125,7 @@ UnionMode_DENSE = _UnionMode_DENSE __pc = None __pac = None +__cuda_loaded = False def _pc(): @@ -143,6 +144,18 @@ def _pac(): return __pac +def _ensure_cuda_loaded(): + # Try importing the cuda module to ensure libarrow_cuda gets loaded + # to register the CUDA device for the C Data Interface import + global __cuda_loaded + if not __cuda_loaded: + try: + import pyarrow.cuda + except ImportError: + pass + __cuda_loaded = True + + def _gdb_test_session(): GdbTestSession() diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 1ab3fd04ed9..c396d53a8ba 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -3613,6 +3613,8 @@ cdef class RecordBatch(_Tabular): void* c_schema_ptr shared_ptr[CRecordBatch] c_batch + _ensure_cuda_loaded() + c_schema = pyarrow_unwrap_schema(schema) if c_schema == nullptr: # Not a Schema object, perhaps a raw ArrowSchema pointer From 76a3a5e1eabf0ab4822b7d98fa56b01f936027a4 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 27 Mar 2024 13:24:15 +0000 Subject: [PATCH 09/12] linting: suppress unused import error --- python/pyarrow/lib.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/lib.pyx b/python/pyarrow/lib.pyx index 96e0484780a..cbbfb85c066 100644 --- a/python/pyarrow/lib.pyx +++ b/python/pyarrow/lib.pyx @@ -150,7 +150,7 @@ def _ensure_cuda_loaded(): global __cuda_loaded if not __cuda_loaded: try: - import pyarrow.cuda + import pyarrow.cuda # noqa except ImportError: pass __cuda_loaded = True From 326b730db45743cbbde2bacc3babb814056644d8 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 27 Mar 2024 15:16:54 +0100 Subject: [PATCH 10/12] linter second try --- python/pyarrow/lib.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/lib.pyx b/python/pyarrow/lib.pyx index cbbfb85c066..39b13acb466 100644 --- a/python/pyarrow/lib.pyx +++ b/python/pyarrow/lib.pyx @@ -150,7 +150,7 @@ def _ensure_cuda_loaded(): global __cuda_loaded if not __cuda_loaded: try: - import pyarrow.cuda # noqa + import pyarrow.cuda # no-cython-lint except ImportError: pass __cuda_loaded = True From 73bfca140be2b6642b109e6c6a81977ccf41a868 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Thu, 28 Mar 2024 08:39:17 +0100 Subject: [PATCH 11/12] only load on demand if we actually receive a CUDA device array --- python/pyarrow/array.pxi | 10 +++++----- python/pyarrow/includes/libarrow.pxd | 4 +++- python/pyarrow/table.pxi | 9 +++++---- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 04973401007..b521eed5ec7 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1835,11 +1835,12 @@ cdef class Array(_PandasConvertible): This is a low-level function intended for expert users. """ cdef: - void* c_ptr = _as_c_pointer(in_ptr) + ArrowDeviceArray* c_device_array = _as_c_pointer(in_ptr) void* c_type_ptr shared_ptr[CArray] c_array - _ensure_cuda_loaded() + if c_device_array.device_type == 2: + _ensure_cuda_loaded() c_type = pyarrow_unwrap_data_type(type) if c_type == nullptr: @@ -1847,13 +1848,12 @@ cdef class Array(_PandasConvertible): c_type_ptr = _as_c_pointer(type) with nogil: c_array = GetResultValue( - ImportDeviceArray( c_ptr, - c_type_ptr) + ImportDeviceArray(c_device_array, c_type_ptr) ) else: with nogil: c_array = GetResultValue( - ImportDeviceArray( c_ptr, c_type) + ImportDeviceArray(c_device_array, c_type) ) return pyarrow_wrap_array(c_array) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 9e5e3d3fa68..d19cf001725 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2917,7 +2917,9 @@ cdef extern from "arrow/c/abi.h": void (*release)(ArrowArrayStream*) noexcept nogil cdef struct ArrowDeviceArray: - pass + ArrowArray array + int64_t device_id + int32_t device_type cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil: CStatus ExportType(CDataType&, ArrowSchema* out) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index c396d53a8ba..2883ff0f824 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -3609,11 +3609,12 @@ cdef class RecordBatch(_Tabular): This is a low-level function intended for expert users. """ cdef: - void* c_ptr = _as_c_pointer(in_ptr) + ArrowDeviceArray* c_device_array = _as_c_pointer(in_ptr) void* c_schema_ptr shared_ptr[CRecordBatch] c_batch - _ensure_cuda_loaded() + if c_device_array.device_type == 2: + _ensure_cuda_loaded() c_schema = pyarrow_unwrap_schema(schema) if c_schema == nullptr: @@ -3621,11 +3622,11 @@ cdef class RecordBatch(_Tabular): c_schema_ptr = _as_c_pointer(schema, allow_null=True) with nogil: c_batch = GetResultValue(ImportDeviceRecordBatch( - c_ptr, c_schema_ptr)) + c_device_array, c_schema_ptr)) else: with nogil: c_batch = GetResultValue(ImportDeviceRecordBatch( - c_ptr, c_schema)) + c_device_array, c_schema)) return pyarrow_wrap_batch(c_batch) From 3e9f913114e5a1e9d85e0b059612eb4ff224edfe Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 9 Apr 2024 09:30:41 +0200 Subject: [PATCH 12/12] address feedback: raise informative error message --- python/pyarrow/array.pxi | 2 +- python/pyarrow/includes/libarrow.pxd | 3 +++ python/pyarrow/lib.pyx | 16 +++++++++++----- python/pyarrow/table.pxi | 2 +- python/pyarrow/tests/test_cffi.py | 21 +++++++++++++++++++++ 5 files changed, 37 insertions(+), 7 deletions(-) diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index b521eed5ec7..5923ef45bba 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1839,7 +1839,7 @@ cdef class Array(_PandasConvertible): void* c_type_ptr shared_ptr[CArray] c_array - if c_device_array.device_type == 2: + if c_device_array.device_type == ARROW_DEVICE_CUDA: _ensure_cuda_loaded() c_type = pyarrow_unwrap_data_type(type) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index e09d3543a0d..3e2d0ed2095 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2916,6 +2916,9 @@ cdef extern from "arrow/c/abi.h": cdef struct ArrowArrayStream: void (*release)(ArrowArrayStream*) noexcept nogil + ctypedef int32_t ArrowDeviceType + cdef ArrowDeviceType ARROW_DEVICE_CUDA + cdef struct ArrowDeviceArray: ArrowArray array int64_t device_id diff --git a/python/pyarrow/lib.pyx b/python/pyarrow/lib.pyx index 39b13acb466..4937ebe3c29 100644 --- a/python/pyarrow/lib.pyx +++ b/python/pyarrow/lib.pyx @@ -125,7 +125,7 @@ UnionMode_DENSE = _UnionMode_DENSE __pc = None __pac = None -__cuda_loaded = False +__cuda_loaded = None def _pc(): @@ -148,12 +148,18 @@ def _ensure_cuda_loaded(): # Try importing the cuda module to ensure libarrow_cuda gets loaded # to register the CUDA device for the C Data Interface import global __cuda_loaded - if not __cuda_loaded: + if __cuda_loaded is None: try: import pyarrow.cuda # no-cython-lint - except ImportError: - pass - __cuda_loaded = True + __cuda_loaded = True + except ImportError as exc: + __cuda_loaded = str(exc) + + if __cuda_loaded is not True: + raise ImportError( + "Trying to import data on a CUDA device, but PyArrow is not built with " + f"CUDA support.\n(importing 'pyarrow.cuda' resulted in \"{__cuda_loaded}\")." + ) def _gdb_test_session(): diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 2112ddefa4b..42be5fad1a6 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -3656,7 +3656,7 @@ cdef class RecordBatch(_Tabular): void* c_schema_ptr shared_ptr[CRecordBatch] c_batch - if c_device_array.device_type == 2: + if c_device_array.device_type == ARROW_DEVICE_CUDA: _ensure_cuda_loaded() c_schema = pyarrow_unwrap_schema(schema) diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index f8b2ea15d31..745e430a121 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -697,3 +697,24 @@ def test_roundtrip_chunked_array_capsule_requested_schema(): requested_capsule = requested_type.__arrow_c_schema__() with pytest.raises(NotImplementedError): chunked.__arrow_c_stream__(requested_capsule) + + +def test_import_device_no_cuda(): + try: + import pyarrow.cuda # noqa + except ImportError: + pass + else: + pytest.skip("pyarrow.cuda is available") + + c_array = ffi.new("struct ArrowDeviceArray*") + ptr_array = int(ffi.cast("uintptr_t", c_array)) + arr = pa.array([1, 2, 3], type=pa.int64()) + arr._export_to_c_device(ptr_array) + + # patch the device type of the struct, this results in an invalid ArrowDeviceArray + # but this is just to test we raise am error before actually importing buffers + c_array.device_type = 2 # ARROW_DEVICE_CUDA + + with pytest.raises(ImportError, match="Trying to import data on a CUDA device"): + pa.Array._import_from_c_device(ptr_array, arr.type)