Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions python/pyarrow/array.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1702,6 +1702,42 @@ cdef class Array(_PandasConvertible):
_append_array_buffers(self.sp_array.get().data().get(), res)
return res

def copy_to(self, destination):
"""
Construct a copy of the array with all buffers on destination
device.

This method recursively copies the array's buffers and those of its
children onto the destination MemoryManager device and returns the
new Array.

Parameters
----------
destination : pyarrow.MemoryManager or pyarrow.Device
The destination device to copy the array to.

Returns
-------
Array
"""
cdef:
shared_ptr[CArray] c_array
shared_ptr[CMemoryManager] c_memory_manager

if isinstance(destination, Device):
c_memory_manager = (<Device>destination).unwrap().get().default_memory_manager()
elif isinstance(destination, MemoryManager):
c_memory_manager = (<MemoryManager>destination).unwrap()
else:
raise TypeError(
"Argument 'destination' has incorrect type (expected a "
f"pyarrow Device or MemoryManager, got {type(destination)})"
)

with nogil:
c_array = GetResultValue(self.ap.CopyTo(c_memory_manager))
return pyarrow_wrap_array(c_array)

def _export_to_c(self, out_ptr, out_schema_ptr=0):
"""
Export to a C ArrowArray struct, given its pointer.
Expand Down
6 changes: 6 additions & 0 deletions python/pyarrow/device.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ cdef class Device(_Weakrefable):
self.init(device)
return self

cdef inline shared_ptr[CDevice] unwrap(self) nogil:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my curiousity, why did you choose to set the unwrap with nogil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice it's what we do for almost all our unwrap functions (and the few that don't use it probably should for consistency)

But the reason is 1) that this function does not need to GIL (there is no Python interaction), so it can be marked with nogil, and 2) actually marking it as such ensures that we can call this method from within a with nogil: ... block (and given that calling unwrap() is typically needed when calling some C++ function, and we typically use with nogil: ... blocks when we are calling a C++ function, in practice there are quite some cases where we indeed use unwrap() inside such a block)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first section in this doc really helped me understand when/why to release the gil: https://cython.readthedocs.io/en/latest/src/userguide/nogil.html

return self.device

def __eq__(self, other):
if not isinstance(other, Device):
return False
Expand Down Expand Up @@ -130,6 +133,9 @@ cdef class MemoryManager(_Weakrefable):
self.init(mm)
return self

cdef inline shared_ptr[CMemoryManager] unwrap(self) nogil:
return self.memory_manager

def __repr__(self):
return "<pyarrow.MemoryManager device: {}>".format(
frombytes(self.memory_manager.get().device().get().ToString())
Expand Down
4 changes: 4 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
CStatus Validate() const
CStatus ValidateFull() const
CResult[shared_ptr[CArray]] View(const shared_ptr[CDataType]& type)

CDeviceAllocationType device_type()
CResult[shared_ptr[CArray]] CopyTo(const shared_ptr[CMemoryManager]& to) const

shared_ptr[CArray] MakeArray(const shared_ptr[CArrayData]& data)
CResult[shared_ptr[CArray]] MakeArrayOfNull(
Expand Down Expand Up @@ -1027,6 +1029,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CRecordBatch] Slice(int64_t offset)
shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length)

CResult[shared_ptr[CRecordBatch]] CopyTo(const shared_ptr[CMemoryManager]& to) const

CResult[shared_ptr[CTensor]] ToTensor(c_bool null_to_nan, c_bool row_major,
CMemoryPool* pool) const

Expand Down
4 changes: 4 additions & 0 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ cdef class Device(_Weakrefable):
@staticmethod
cdef wrap(const shared_ptr[CDevice]& device)

cdef inline shared_ptr[CDevice] unwrap(self) nogil


cdef class MemoryManager(_Weakrefable):
cdef:
Expand All @@ -549,6 +551,8 @@ cdef class MemoryManager(_Weakrefable):
@staticmethod
cdef wrap(const shared_ptr[CMemoryManager]& mm)

cdef inline shared_ptr[CMemoryManager] unwrap(self) nogil


cdef class Buffer(_Weakrefable):
cdef:
Expand Down
35 changes: 35 additions & 0 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -3569,6 +3569,41 @@ cdef class RecordBatch(_Tabular):
row_major, pool))
return pyarrow_wrap_tensor(c_tensor)

def copy_to(self, destination):
"""
Copy the entire RecordBatch to destination device.

This copies each column of the record batch to create
a new record batch where all underlying buffers for the columns have
been copied to the destination MemoryManager.

Parameters
----------
destination : pyarrow.MemoryManager or pyarrow.Device
The destination device to copy the array to.

Returns
-------
RecordBatch
"""
cdef:
shared_ptr[CRecordBatch] c_batch
shared_ptr[CMemoryManager] c_memory_manager

if isinstance(destination, Device):
c_memory_manager = (<Device>destination).unwrap().get().default_memory_manager()
elif isinstance(destination, MemoryManager):
c_memory_manager = (<MemoryManager>destination).unwrap()
else:
raise TypeError(
"Argument 'destination' has incorrect type (expected a "
f"pyarrow Device or MemoryManager, got {type(destination)})"
)

with nogil:
c_batch = GetResultValue(self.batch.CopyTo(c_memory_manager))
return pyarrow_wrap_batch(c_batch)

def _export_to_c(self, out_ptr, out_schema_ptr=0):
"""
Export to a C ArrowArray struct, given its pointer.
Expand Down
82 changes: 32 additions & 50 deletions python/pyarrow/tests/test_cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,21 +827,29 @@ def test_IPC(size):
assert p.exitcode == 0


def _arr_copy_to_host(carr):
# TODO replace below with copy to device when exposed in python
buffers = []
for cbuf in carr.buffers():
if cbuf is None:
buffers.append(None)
else:
buf = global_context.foreign_buffer(
cbuf.address, cbuf.size, cbuf
).copy_to_host()
buffers.append(buf)

child = pa.Array.from_buffers(carr.type.value_type, 3, buffers[2:])
new = pa.Array.from_buffers(carr.type, 2, buffers[:2], children=[child])
return new
def test_copy_to():
_, buf = make_random_buffer(size=10, target='device')
mm_cuda = buf.memory_manager

for dest in [mm_cuda, mm_cuda.device]:
arr = pa.array([0, 1, 2])
arr_cuda = arr.copy_to(dest)
assert not arr_cuda.buffers()[1].is_cpu
assert arr_cuda.buffers()[1].device_type == pa.DeviceAllocationType.CUDA
assert arr_cuda.buffers()[1].device == mm_cuda.device

arr_roundtrip = arr_cuda.copy_to(pa.default_cpu_memory_manager())
assert arr_roundtrip.equals(arr)

batch = pa.record_batch({"col": arr})
batch_cuda = batch.copy_to(dest)
buf_cuda = batch_cuda["col"].buffers()[1]
assert not buf_cuda.is_cpu
assert buf_cuda.device_type == pa.DeviceAllocationType.CUDA
assert buf_cuda.device == mm_cuda.device

batch_roundtrip = batch_cuda.copy_to(pa.default_cpu_memory_manager())
assert batch_roundtrip.equals(batch)


def test_device_interface_array():
Expand All @@ -856,19 +864,10 @@ def test_device_interface_array():
typ = pa.list_(pa.int32())
arr = pa.array([[1], [2, 42]], type=typ)

# TODO replace below with copy to device when exposed in python
cbuffers = []
for buf in arr.buffers():
if buf is None:
cbuffers.append(None)
else:
cbuf = global_context.new_buffer(buf.size)
cbuf.copy_from_host(buf, position=0, nbytes=buf.size)
cbuffers.append(cbuf)

carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[
pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:])
])
# copy to device
_, buf = make_random_buffer(size=10, target='device')
mm_cuda = buf.memory_manager
carr = arr.copy_to(mm_cuda)

# Type is known up front
carr._export_to_c_device(ptr_array)
Expand All @@ -882,7 +881,7 @@ def test_device_interface_array():
del carr
carr_new = pa.Array._import_from_c_device(ptr_array, typ)
assert carr_new.type == pa.list_(pa.int32())
arr_new = _arr_copy_to_host(carr_new)
arr_new = carr_new.copy_to(pa.default_cpu_memory_manager())
assert arr_new.equals(arr)

del carr_new
Expand All @@ -891,15 +890,13 @@ def test_device_interface_array():
pa.Array._import_from_c_device(ptr_array, typ)

# Schema is exported and imported at the same time
carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[
pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:])
])
carr = arr.copy_to(mm_cuda)
carr._export_to_c_device(ptr_array, ptr_schema)
# Delete and recreate C++ objects from exported pointers
del carr
carr_new = pa.Array._import_from_c_device(ptr_array, ptr_schema)
assert carr_new.type == pa.list_(pa.int32())
arr_new = _arr_copy_to_host(carr_new)
arr_new = carr_new.copy_to(pa.default_cpu_memory_manager())
assert arr_new.equals(arr)

del carr_new
Expand All @@ -908,21 +905,6 @@ def test_device_interface_array():
pa.Array._import_from_c_device(ptr_array, ptr_schema)


def _batch_copy_to_host(cbatch):
# TODO replace below with copy to device when exposed in python
arrs = []
for col in cbatch.columns:
buffers = [
global_context.foreign_buffer(buf.address, buf.size, buf).copy_to_host()
if buf is not None else None
for buf in col.buffers()
]
new = pa.Array.from_buffers(col.type, len(col), buffers)
arrs.append(new)

return pa.RecordBatch.from_arrays(arrs, schema=cbatch.schema)


def test_device_interface_batch_array():
cffi = pytest.importorskip("pyarrow.cffi")
ffi = cffi.ffi
Expand All @@ -949,7 +931,7 @@ def test_device_interface_batch_array():
del cbatch
cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema)
assert cbatch_new.schema == schema
batch_new = _batch_copy_to_host(cbatch_new)
batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager())
assert batch_new.equals(batch)

del cbatch_new
Expand All @@ -964,7 +946,7 @@ def test_device_interface_batch_array():
del cbatch
cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)
assert cbatch_new.schema == schema
batch_new = _batch_copy_to_host(cbatch_new)
batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager())
assert batch_new.equals(batch)

del cbatch_new
Expand Down
26 changes: 26 additions & 0 deletions python/pyarrow/tests/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import pyarrow as pa

import pytest


def test_device_memory_manager():
mm = pa.default_cpu_memory_manager()
Expand All @@ -41,3 +43,27 @@ def test_buffer_device():
assert buf.device.is_cpu
assert buf.device == pa.default_cpu_memory_manager().device
assert buf.memory_manager.is_cpu


def test_copy_to():
mm = pa.default_cpu_memory_manager()

arr = pa.array([0, 1, 2])
batch = pa.record_batch({"col": arr})

for dest in [mm, mm.device]:
arr_copied = arr.copy_to(dest)
assert arr_copied.equals(arr)
assert arr_copied.buffers()[1].device == mm.device
assert arr_copied.buffers()[1].address != arr.buffers()[1].address

batch_copied = batch.copy_to(dest)
assert batch_copied.equals(batch)
assert batch_copied["col"].buffers()[1].device == mm.device
assert batch_copied["col"].buffers()[1].address != arr.buffers()[1].address

with pytest.raises(TypeError, match="Argument 'destination' has incorrect type"):
arr.copy_to(mm.device.device_type)

with pytest.raises(TypeError, match="Argument 'destination' has incorrect type"):
batch.copy_to(mm.device.device_type)