diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 6c40a21db96..850378d2403 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1702,6 +1702,42 @@ cdef class Array(_PandasConvertible): _append_array_buffers(self.sp_array.get().data().get(), res) return res + def copy_to(self, destination): + """ + Construct a copy of the array with all buffers on destination + device. + + This method recursively copies the array's buffers and those of its + children onto the destination MemoryManager device and returns the + new Array. + + Parameters + ---------- + destination : pyarrow.MemoryManager or pyarrow.Device + The destination device to copy the array to. + + Returns + ------- + Array + """ + cdef: + shared_ptr[CArray] c_array + shared_ptr[CMemoryManager] c_memory_manager + + if isinstance(destination, Device): + c_memory_manager = (destination).unwrap().get().default_memory_manager() + elif isinstance(destination, MemoryManager): + c_memory_manager = (destination).unwrap() + else: + raise TypeError( + "Argument 'destination' has incorrect type (expected a " + f"pyarrow Device or MemoryManager, got {type(destination)})" + ) + + with nogil: + c_array = GetResultValue(self.ap.CopyTo(c_memory_manager)) + return pyarrow_wrap_array(c_array) + def _export_to_c(self, out_ptr, out_schema_ptr=0): """ Export to a C ArrowArray struct, given its pointer. diff --git a/python/pyarrow/device.pxi b/python/pyarrow/device.pxi index 6e603475208..26256de6209 100644 --- a/python/pyarrow/device.pxi +++ b/python/pyarrow/device.pxi @@ -64,6 +64,9 @@ cdef class Device(_Weakrefable): self.init(device) return self + cdef inline shared_ptr[CDevice] unwrap(self) nogil: + return self.device + def __eq__(self, other): if not isinstance(other, Device): return False @@ -130,6 +133,9 @@ cdef class MemoryManager(_Weakrefable): self.init(mm) return self + cdef inline shared_ptr[CMemoryManager] unwrap(self) nogil: + return self.memory_manager + def __repr__(self): return "".format( frombytes(self.memory_manager.get().device().get().ToString()) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 9b008d150f1..66a4eab5df0 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -234,7 +234,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CStatus Validate() const CStatus ValidateFull() const CResult[shared_ptr[CArray]] View(const shared_ptr[CDataType]& type) + CDeviceAllocationType device_type() + CResult[shared_ptr[CArray]] CopyTo(const shared_ptr[CMemoryManager]& to) const shared_ptr[CArray] MakeArray(const shared_ptr[CArrayData]& data) CResult[shared_ptr[CArray]] MakeArrayOfNull( @@ -1027,6 +1029,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: shared_ptr[CRecordBatch] Slice(int64_t offset) shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length) + CResult[shared_ptr[CRecordBatch]] CopyTo(const shared_ptr[CMemoryManager]& to) const + CResult[shared_ptr[CTensor]] ToTensor(c_bool null_to_nan, c_bool row_major, CMemoryPool* pool) const diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 2cb302d20a8..64e79a2a6ad 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -539,6 +539,8 @@ cdef class Device(_Weakrefable): @staticmethod cdef wrap(const shared_ptr[CDevice]& device) + cdef inline shared_ptr[CDevice] unwrap(self) nogil + cdef class MemoryManager(_Weakrefable): cdef: @@ -549,6 +551,8 @@ cdef class MemoryManager(_Weakrefable): @staticmethod cdef wrap(const shared_ptr[CMemoryManager]& mm) + cdef inline shared_ptr[CMemoryManager] unwrap(self) nogil + cdef class Buffer(_Weakrefable): cdef: diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 8f7c44e55dc..6d34c71c9df 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -3569,6 +3569,41 @@ cdef class RecordBatch(_Tabular): row_major, pool)) return pyarrow_wrap_tensor(c_tensor) + def copy_to(self, destination): + """ + Copy the entire RecordBatch to destination device. + + This copies each column of the record batch to create + a new record batch where all underlying buffers for the columns have + been copied to the destination MemoryManager. + + Parameters + ---------- + destination : pyarrow.MemoryManager or pyarrow.Device + The destination device to copy the array to. + + Returns + ------- + RecordBatch + """ + cdef: + shared_ptr[CRecordBatch] c_batch + shared_ptr[CMemoryManager] c_memory_manager + + if isinstance(destination, Device): + c_memory_manager = (destination).unwrap().get().default_memory_manager() + elif isinstance(destination, MemoryManager): + c_memory_manager = (destination).unwrap() + else: + raise TypeError( + "Argument 'destination' has incorrect type (expected a " + f"pyarrow Device or MemoryManager, got {type(destination)})" + ) + + with nogil: + c_batch = GetResultValue(self.batch.CopyTo(c_memory_manager)) + return pyarrow_wrap_batch(c_batch) + def _export_to_c(self, out_ptr, out_schema_ptr=0): """ Export to a C ArrowArray struct, given its pointer. diff --git a/python/pyarrow/tests/test_cuda.py b/python/pyarrow/tests/test_cuda.py index 36b97a62064..d55be651b15 100644 --- a/python/pyarrow/tests/test_cuda.py +++ b/python/pyarrow/tests/test_cuda.py @@ -827,21 +827,29 @@ def test_IPC(size): assert p.exitcode == 0 -def _arr_copy_to_host(carr): - # TODO replace below with copy to device when exposed in python - buffers = [] - for cbuf in carr.buffers(): - if cbuf is None: - buffers.append(None) - else: - buf = global_context.foreign_buffer( - cbuf.address, cbuf.size, cbuf - ).copy_to_host() - buffers.append(buf) - - child = pa.Array.from_buffers(carr.type.value_type, 3, buffers[2:]) - new = pa.Array.from_buffers(carr.type, 2, buffers[:2], children=[child]) - return new +def test_copy_to(): + _, buf = make_random_buffer(size=10, target='device') + mm_cuda = buf.memory_manager + + for dest in [mm_cuda, mm_cuda.device]: + arr = pa.array([0, 1, 2]) + arr_cuda = arr.copy_to(dest) + assert not arr_cuda.buffers()[1].is_cpu + assert arr_cuda.buffers()[1].device_type == pa.DeviceAllocationType.CUDA + assert arr_cuda.buffers()[1].device == mm_cuda.device + + arr_roundtrip = arr_cuda.copy_to(pa.default_cpu_memory_manager()) + assert arr_roundtrip.equals(arr) + + batch = pa.record_batch({"col": arr}) + batch_cuda = batch.copy_to(dest) + buf_cuda = batch_cuda["col"].buffers()[1] + assert not buf_cuda.is_cpu + assert buf_cuda.device_type == pa.DeviceAllocationType.CUDA + assert buf_cuda.device == mm_cuda.device + + batch_roundtrip = batch_cuda.copy_to(pa.default_cpu_memory_manager()) + assert batch_roundtrip.equals(batch) def test_device_interface_array(): @@ -856,19 +864,10 @@ def test_device_interface_array(): typ = pa.list_(pa.int32()) arr = pa.array([[1], [2, 42]], type=typ) - # TODO replace below with copy to device when exposed in python - cbuffers = [] - for buf in arr.buffers(): - if buf is None: - cbuffers.append(None) - else: - cbuf = global_context.new_buffer(buf.size) - cbuf.copy_from_host(buf, position=0, nbytes=buf.size) - cbuffers.append(cbuf) - - carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[ - pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:]) - ]) + # copy to device + _, buf = make_random_buffer(size=10, target='device') + mm_cuda = buf.memory_manager + carr = arr.copy_to(mm_cuda) # Type is known up front carr._export_to_c_device(ptr_array) @@ -882,7 +881,7 @@ def test_device_interface_array(): del carr carr_new = pa.Array._import_from_c_device(ptr_array, typ) assert carr_new.type == pa.list_(pa.int32()) - arr_new = _arr_copy_to_host(carr_new) + arr_new = carr_new.copy_to(pa.default_cpu_memory_manager()) assert arr_new.equals(arr) del carr_new @@ -891,15 +890,13 @@ def test_device_interface_array(): pa.Array._import_from_c_device(ptr_array, typ) # Schema is exported and imported at the same time - carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[ - pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:]) - ]) + carr = arr.copy_to(mm_cuda) carr._export_to_c_device(ptr_array, ptr_schema) # Delete and recreate C++ objects from exported pointers del carr carr_new = pa.Array._import_from_c_device(ptr_array, ptr_schema) assert carr_new.type == pa.list_(pa.int32()) - arr_new = _arr_copy_to_host(carr_new) + arr_new = carr_new.copy_to(pa.default_cpu_memory_manager()) assert arr_new.equals(arr) del carr_new @@ -908,21 +905,6 @@ def test_device_interface_array(): pa.Array._import_from_c_device(ptr_array, ptr_schema) -def _batch_copy_to_host(cbatch): - # TODO replace below with copy to device when exposed in python - arrs = [] - for col in cbatch.columns: - buffers = [ - global_context.foreign_buffer(buf.address, buf.size, buf).copy_to_host() - if buf is not None else None - for buf in col.buffers() - ] - new = pa.Array.from_buffers(col.type, len(col), buffers) - arrs.append(new) - - return pa.RecordBatch.from_arrays(arrs, schema=cbatch.schema) - - def test_device_interface_batch_array(): cffi = pytest.importorskip("pyarrow.cffi") ffi = cffi.ffi @@ -949,7 +931,7 @@ def test_device_interface_batch_array(): del cbatch cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema) assert cbatch_new.schema == schema - batch_new = _batch_copy_to_host(cbatch_new) + batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager()) assert batch_new.equals(batch) del cbatch_new @@ -964,7 +946,7 @@ def test_device_interface_batch_array(): del cbatch cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) assert cbatch_new.schema == schema - batch_new = _batch_copy_to_host(cbatch_new) + batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager()) assert batch_new.equals(batch) del cbatch_new diff --git a/python/pyarrow/tests/test_device.py b/python/pyarrow/tests/test_device.py index 6bdb015be1a..dc1a51e6d00 100644 --- a/python/pyarrow/tests/test_device.py +++ b/python/pyarrow/tests/test_device.py @@ -17,6 +17,8 @@ import pyarrow as pa +import pytest + def test_device_memory_manager(): mm = pa.default_cpu_memory_manager() @@ -41,3 +43,27 @@ def test_buffer_device(): assert buf.device.is_cpu assert buf.device == pa.default_cpu_memory_manager().device assert buf.memory_manager.is_cpu + + +def test_copy_to(): + mm = pa.default_cpu_memory_manager() + + arr = pa.array([0, 1, 2]) + batch = pa.record_batch({"col": arr}) + + for dest in [mm, mm.device]: + arr_copied = arr.copy_to(dest) + assert arr_copied.equals(arr) + assert arr_copied.buffers()[1].device == mm.device + assert arr_copied.buffers()[1].address != arr.buffers()[1].address + + batch_copied = batch.copy_to(dest) + assert batch_copied.equals(batch) + assert batch_copied["col"].buffers()[1].device == mm.device + assert batch_copied["col"].buffers()[1].address != arr.buffers()[1].address + + with pytest.raises(TypeError, match="Argument 'destination' has incorrect type"): + arr.copy_to(mm.device.device_type) + + with pytest.raises(TypeError, match="Argument 'destination' has incorrect type"): + batch.copy_to(mm.device.device_type)