Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
ef8be15
support + test cuda device interface (with hardcoded dependency on li…
jorisvandenbossche Mar 6, 2024
143f511
test on cpu
jorisvandenbossche Mar 6, 2024
42e883c
try with #define
jorisvandenbossche Mar 19, 2024
05dd7a5
try with helper function duplicated in _cuda.pyx
jorisvandenbossche Mar 19, 2024
83b0d58
fixup import from pyarrow._cuda
jorisvandenbossche Mar 19, 2024
fd08ac1
fix linting issues in test_cuda.py
jorisvandenbossche Mar 19, 2024
3667071
Merge remote-tracking branch 'upstream/main' into device-interface-cuda
jorisvandenbossche Mar 27, 2024
1bfc05c
undo non-test changes + try import pyarrow.cuda instead to register
jorisvandenbossche Mar 27, 2024
65847db
Merge remote-tracking branch 'upstream/main' into device-interface-cuda
jorisvandenbossche Mar 27, 2024
c35df80
import on demand
jorisvandenbossche Mar 27, 2024
76a3a5e
linting: suppress unused import error
jorisvandenbossche Mar 27, 2024
326b730
linter second try
jorisvandenbossche Mar 27, 2024
73bfca1
only load on demand if we actually receive a CUDA device array
jorisvandenbossche Mar 28, 2024
78a7fb5
Merge remote-tracking branch 'upstream/main' into device-interface-cuda
jorisvandenbossche Apr 8, 2024
820bb47
Merge remote-tracking branch 'upstream/main' into device-interface-cuda
jorisvandenbossche Apr 9, 2024
3e9f913
address feedback: raise informative error message
jorisvandenbossche Apr 9, 2024
eca6869
Merge remote-tracking branch 'upstream/main' into device-interface-cuda
jorisvandenbossche Jun 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions python/pyarrow/array.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1825,23 +1825,25 @@ cdef class Array(_PandasConvertible):
This is a low-level function intended for expert users.
"""
cdef:
void* c_ptr = _as_c_pointer(in_ptr)
ArrowDeviceArray* c_device_array = <ArrowDeviceArray*>_as_c_pointer(in_ptr)
void* c_type_ptr
shared_ptr[CArray] c_array

if c_device_array.device_type == ARROW_DEVICE_CUDA:
_ensure_cuda_loaded()

c_type = pyarrow_unwrap_data_type(type)
if c_type == nullptr:
# Not a DataType object, perhaps a raw ArrowSchema pointer
c_type_ptr = _as_c_pointer(type)
with nogil:
c_array = GetResultValue(
ImportDeviceArray(<ArrowDeviceArray*> c_ptr,
<ArrowSchema*> c_type_ptr)
ImportDeviceArray(c_device_array, <ArrowSchema*> c_type_ptr)
)
else:
with nogil:
c_array = GetResultValue(
ImportDeviceArray(<ArrowDeviceArray*> c_ptr, c_type)
ImportDeviceArray(c_device_array, c_type)
)
return pyarrow_wrap_array(c_array)

Expand Down
7 changes: 6 additions & 1 deletion python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2964,8 +2964,13 @@ cdef extern from "arrow/c/abi.h":
cdef struct ArrowArrayStream:
void (*release)(ArrowArrayStream*) noexcept nogil

ctypedef int32_t ArrowDeviceType
cdef ArrowDeviceType ARROW_DEVICE_CUDA

cdef struct ArrowDeviceArray:
pass
ArrowArray array
int64_t device_id
int32_t device_type

cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil:
CStatus ExportType(CDataType&, ArrowSchema* out)
Expand Down
19 changes: 19 additions & 0 deletions python/pyarrow/lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ UnionMode_DENSE = _UnionMode_DENSE

__pc = None
__pac = None
__cuda_loaded = None


def _pc():
Expand All @@ -143,6 +144,24 @@ def _pac():
return __pac


def _ensure_cuda_loaded():
# Try importing the cuda module to ensure libarrow_cuda gets loaded
# to register the CUDA device for the C Data Interface import
global __cuda_loaded
if __cuda_loaded is None:
try:
import pyarrow.cuda # no-cython-lint
__cuda_loaded = True
except ImportError as exc:
__cuda_loaded = str(exc)

if __cuda_loaded is not True:
raise ImportError(
"Trying to import data on a CUDA device, but PyArrow is not built with "
f"CUDA support.\n(importing 'pyarrow.cuda' resulted in \"{__cuda_loaded}\")."
)


def _gdb_test_session():
GdbTestSession()

Expand Down
9 changes: 6 additions & 3 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -3752,21 +3752,24 @@ cdef class RecordBatch(_Tabular):
This is a low-level function intended for expert users.
"""
cdef:
void* c_ptr = _as_c_pointer(in_ptr)
ArrowDeviceArray* c_device_array = <ArrowDeviceArray*>_as_c_pointer(in_ptr)
void* c_schema_ptr
shared_ptr[CRecordBatch] c_batch

if c_device_array.device_type == ARROW_DEVICE_CUDA:
_ensure_cuda_loaded()

c_schema = pyarrow_unwrap_schema(schema)
if c_schema == nullptr:
# Not a Schema object, perhaps a raw ArrowSchema pointer
c_schema_ptr = _as_c_pointer(schema, allow_null=True)
with nogil:
c_batch = GetResultValue(ImportDeviceRecordBatch(
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr))
c_device_array, <ArrowSchema*> c_schema_ptr))
else:
with nogil:
c_batch = GetResultValue(ImportDeviceRecordBatch(
<ArrowDeviceArray*> c_ptr, c_schema))
c_device_array, c_schema))
return pyarrow_wrap_batch(c_batch)


Expand Down
21 changes: 21 additions & 0 deletions python/pyarrow/tests/test_cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,3 +705,24 @@ def test_roundtrip_chunked_array_capsule_requested_schema():
ValueError, match="Could not cast string to requested type int64"
):
chunked.__arrow_c_stream__(requested_capsule)


def test_import_device_no_cuda():
try:
import pyarrow.cuda # noqa
except ImportError:
pass
else:
pytest.skip("pyarrow.cuda is available")

c_array = ffi.new("struct ArrowDeviceArray*")
ptr_array = int(ffi.cast("uintptr_t", c_array))
arr = pa.array([1, 2, 3], type=pa.int64())
arr._export_to_c_device(ptr_array)

# patch the device type of the struct, this results in an invalid ArrowDeviceArray
# but this is just to test we raise am error before actually importing buffers
c_array.device_type = 2 # ARROW_DEVICE_CUDA

with pytest.raises(ImportError, match="Trying to import data on a CUDA device"):
pa.Array._import_from_c_device(ptr_array, arr.type)
152 changes: 152 additions & 0 deletions python/pyarrow/tests/test_cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,3 +792,155 @@ def test_IPC(size):
p.start()
p.join()
assert p.exitcode == 0


def _arr_copy_to_host(carr):
# TODO replace below with copy to device when exposed in python
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a GH issue for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet (I still need to create a whole set of issues to track the work I am planning the next months to improve the Device / MemoryManager bindings in Python)

buffers = []
for cbuf in carr.buffers():
if cbuf is None:
buffers.append(None)
else:
buf = global_context.foreign_buffer(
cbuf.address, cbuf.size, cbuf
).copy_to_host()
buffers.append(buf)

child = pa.Array.from_buffers(carr.type.value_type, 3, buffers[2:])
new = pa.Array.from_buffers(carr.type, 2, buffers[:2], children=[child])
return new


def test_device_interface_array():
cffi = pytest.importorskip("pyarrow.cffi")
ffi = cffi.ffi

c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
c_array = ffi.new("struct ArrowDeviceArray*")
ptr_array = int(ffi.cast("uintptr_t", c_array))

typ = pa.list_(pa.int32())
arr = pa.array([[1], [2, 42]], type=typ)

# TODO replace below with copy to device when exposed in python
cbuffers = []
for buf in arr.buffers():
if buf is None:
cbuffers.append(None)
else:
cbuf = global_context.new_buffer(buf.size)
cbuf.copy_from_host(buf, position=0, nbytes=buf.size)
cbuffers.append(cbuf)

carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[
pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:])
])

# Type is known up front
carr._export_to_c_device(ptr_array)

# verify exported struct
assert c_array.device_type == 2 # ARROW_DEVICE_CUDA 2
assert c_array.device_id == global_context.device_number
assert c_array.array.length == 2

# Delete recreate C++ object from exported pointer
del carr
carr_new = pa.Array._import_from_c_device(ptr_array, typ)
assert carr_new.type == pa.list_(pa.int32())
arr_new = _arr_copy_to_host(carr_new)
assert arr_new.equals(arr)

del carr_new
# Now released
with pytest.raises(ValueError, match="Cannot import released ArrowArray"):
pa.Array._import_from_c_device(ptr_array, typ)

# Schema is exported and imported at the same time
carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[
pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:])
])
carr._export_to_c_device(ptr_array, ptr_schema)
# Delete and recreate C++ objects from exported pointers
del carr
carr_new = pa.Array._import_from_c_device(ptr_array, ptr_schema)
assert carr_new.type == pa.list_(pa.int32())
arr_new = _arr_copy_to_host(carr_new)
assert arr_new.equals(arr)

del carr_new
# Now released
with pytest.raises(ValueError, match="Cannot import released ArrowSchema"):
pa.Array._import_from_c_device(ptr_array, ptr_schema)


def _batch_copy_to_host(cbatch):
# TODO replace below with copy to device when exposed in python
arrs = []
for col in cbatch.columns:
buffers = [
global_context.foreign_buffer(buf.address, buf.size, buf).copy_to_host()
if buf is not None else None
for buf in col.buffers()
]
new = pa.Array.from_buffers(col.type, len(col), buffers)
arrs.append(new)

return pa.RecordBatch.from_arrays(arrs, schema=cbatch.schema)


def test_device_interface_batch_array():
cffi = pytest.importorskip("pyarrow.cffi")
ffi = cffi.ffi

c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
c_array = ffi.new("struct ArrowDeviceArray*")
ptr_array = int(ffi.cast("uintptr_t", c_array))

batch = make_recordbatch(10)
schema = batch.schema
cbuf = cuda.serialize_record_batch(batch, global_context)
cbatch = cuda.read_record_batch(cbuf, schema)

# Schema is known up front
cbatch._export_to_c_device(ptr_array)

# verify exported struct
assert c_array.device_type == 2 # ARROW_DEVICE_CUDA 2
assert c_array.device_id == global_context.device_number
assert c_array.array.length == 10

# Delete recreate C++ object from exported pointer
del cbatch
cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema)
assert cbatch_new.schema == schema
batch_new = _batch_copy_to_host(cbatch_new)
assert batch_new.equals(batch)

del cbatch_new
# Now released
with pytest.raises(ValueError, match="Cannot import released ArrowArray"):
pa.RecordBatch._import_from_c_device(ptr_array, schema)

# Schema is exported and imported at the same time
cbatch = cuda.read_record_batch(cbuf, schema)
cbatch._export_to_c_device(ptr_array, ptr_schema)
# Delete and recreate C++ objects from exported pointers
del cbatch
cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)
assert cbatch_new.schema == schema
batch_new = _batch_copy_to_host(cbatch_new)
assert batch_new.equals(batch)

del cbatch_new
# Now released
with pytest.raises(ValueError, match="Cannot import released ArrowSchema"):
pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)

# Not a struct type
pa.int32()._export_to_c(ptr_schema)
with pytest.raises(ValueError,
match="ArrowSchema describes non-struct type"):
pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)