From 942184b03b2ba8ccccc5a1cceb026144bbbc555b Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Thu, 20 Jun 2024 15:09:24 +0000 Subject: [PATCH 1/4] GH-42222: [Python] Add bindings for CopyTo on RecordBatch and Array classes --- python/pyarrow/array.pxi | 24 +++++++++ python/pyarrow/device.pxi | 3 ++ python/pyarrow/includes/libarrow.pxd | 3 ++ python/pyarrow/lib.pxd | 2 + python/pyarrow/table.pxi | 23 +++++++++ python/pyarrow/tests/test_cuda.py | 73 +++++++++------------------- python/pyarrow/tests/test_device.py | 13 +++++ 7 files changed, 91 insertions(+), 50 deletions(-) diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 64a6ceaa6ea..d3cc595a10d 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1658,6 +1658,30 @@ cdef class Array(_PandasConvertible): _append_array_buffers(self.sp_array.get().data().get(), res) return res + def copy_to(self, MemoryManager memory_manager): + """ + Construct a copy of the array with all buffers on destination + Memory Manager + + This method recursively copies the array's buffers and those of its + children onto the destination MemoryManager device and returns the + new Array. + + Parameters + ---------- + memory_manager : pyarrow.MemoryManager + + Returns + ------ + Array + """ + cdef: + shared_ptr[CArray] c_array + + with nogil: + c_array = GetResultValue(self.ap.CopyTo(memory_manager.unwrap())) + return pyarrow_wrap_array(c_array) + def _export_to_c(self, out_ptr, out_schema_ptr=0): """ Export to a C ArrowArray struct, given its pointer. diff --git a/python/pyarrow/device.pxi b/python/pyarrow/device.pxi index 6e603475208..2710d18a740 100644 --- a/python/pyarrow/device.pxi +++ b/python/pyarrow/device.pxi @@ -130,6 +130,9 @@ cdef class MemoryManager(_Weakrefable): self.init(mm) return self + cdef inline shared_ptr[CMemoryManager] unwrap(self) nogil: + return self.memory_manager + def __repr__(self): return "".format( frombytes(self.memory_manager.get().device().get().ToString()) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 53ad95f2430..1bd2b044dd3 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -234,6 +234,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CStatus Validate() const CStatus ValidateFull() const CResult[shared_ptr[CArray]] View(const shared_ptr[CDataType]& type) + CResult[shared_ptr[CArray]] CopyTo(const shared_ptr[CMemoryManager]& to) const shared_ptr[CArray] MakeArray(const shared_ptr[CArrayData]& data) CResult[shared_ptr[CArray]] MakeArrayOfNull( @@ -1024,6 +1025,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: shared_ptr[CRecordBatch] Slice(int64_t offset) shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length) + CResult[shared_ptr[CRecordBatch]] CopyTo(const shared_ptr[CMemoryManager]& to) const + CResult[shared_ptr[CTensor]] ToTensor(c_bool null_to_nan, c_bool row_major, CMemoryPool* pool) const diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 1bc639cc8d2..f5e2fba39c9 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -543,6 +543,8 @@ cdef class MemoryManager(_Weakrefable): @staticmethod cdef wrap(const shared_ptr[CMemoryManager]& mm) + cdef inline shared_ptr[CMemoryManager] unwrap(self) nogil + cdef class Buffer(_Weakrefable): cdef: diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 767e0900459..b7a12c23781 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -3549,6 +3549,29 @@ cdef class RecordBatch(_Tabular): row_major, pool)) return pyarrow_wrap_tensor(c_tensor) + def copy_to(self, MemoryManager memory_manager): + """ + Copy the entire RecordBatch to destination MemoryManager. + + This copies each column of the record batch to create + a new record batch where all underlying buffers for the columns have + been copied to the destination MemoryManager. + + Parameters + ---------- + memory_manager : pyarrow.MemoryManager + + Returns + ------ + RecordBatch + """ + cdef: + shared_ptr[CRecordBatch] c_batch + + with nogil: + c_batch = GetResultValue(self.batch.CopyTo(memory_manager.unwrap())) + return pyarrow_wrap_batch(c_batch) + def _export_to_c(self, out_ptr, out_schema_ptr=0): """ Export to a C ArrowArray struct, given its pointer. diff --git a/python/pyarrow/tests/test_cuda.py b/python/pyarrow/tests/test_cuda.py index 400db2643bd..51cd7a05d59 100644 --- a/python/pyarrow/tests/test_cuda.py +++ b/python/pyarrow/tests/test_cuda.py @@ -794,21 +794,20 @@ def test_IPC(size): assert p.exitcode == 0 -def _arr_copy_to_host(carr): - # TODO replace below with copy to device when exposed in python - buffers = [] - for cbuf in carr.buffers(): - if cbuf is None: - buffers.append(None) - else: - buf = global_context.foreign_buffer( - cbuf.address, cbuf.size, cbuf - ).copy_to_host() - buffers.append(buf) - - child = pa.Array.from_buffers(carr.type.value_type, 3, buffers[2:]) - new = pa.Array.from_buffers(carr.type, 2, buffers[:2], children=[child]) - return new +def test_copy_to(): + _, buf = make_random_buffer(size=10, target='device') + mm_cuda = buf.memory_manager + + arr = pa.array([0, 1, 2]) + arr_cuda = arr.copy_to(mm_cuda) + assert not arr_cuda.buffers()[1].is_cpu + assert arr_cuda.buffers()[1].device_type == pa.DeviceAllocationType.CUDA + + batch = pa.record_batch({"col": arr}) + batch_cuda = batch.copy_to(mm_cuda) + buf_cuda = batch_cuda["col"].buffers()[1] + assert not buf_cuda.is_cpu + assert buf_cuda.device_type == pa.DeviceAllocationType.CUDA def test_device_interface_array(): @@ -823,19 +822,10 @@ def test_device_interface_array(): typ = pa.list_(pa.int32()) arr = pa.array([[1], [2, 42]], type=typ) - # TODO replace below with copy to device when exposed in python - cbuffers = [] - for buf in arr.buffers(): - if buf is None: - cbuffers.append(None) - else: - cbuf = global_context.new_buffer(buf.size) - cbuf.copy_from_host(buf, position=0, nbytes=buf.size) - cbuffers.append(cbuf) - - carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[ - pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:]) - ]) + # copy to device + _, buf = make_random_buffer(size=10, target='device') + mm_cuda = buf.memory_manager + carr = arr.copy_to(mm_cuda) # Type is known up front carr._export_to_c_device(ptr_array) @@ -849,7 +839,7 @@ def test_device_interface_array(): del carr carr_new = pa.Array._import_from_c_device(ptr_array, typ) assert carr_new.type == pa.list_(pa.int32()) - arr_new = _arr_copy_to_host(carr_new) + arr_new = carr_new.copy_to(pa.default_cpu_memory_manager()) assert arr_new.equals(arr) del carr_new @@ -858,15 +848,13 @@ def test_device_interface_array(): pa.Array._import_from_c_device(ptr_array, typ) # Schema is exported and imported at the same time - carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[ - pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:]) - ]) + carr = arr.copy_to(mm_cuda) carr._export_to_c_device(ptr_array, ptr_schema) # Delete and recreate C++ objects from exported pointers del carr carr_new = pa.Array._import_from_c_device(ptr_array, ptr_schema) assert carr_new.type == pa.list_(pa.int32()) - arr_new = _arr_copy_to_host(carr_new) + arr_new = carr_new.copy_to(pa.default_cpu_memory_manager()) assert arr_new.equals(arr) del carr_new @@ -875,21 +863,6 @@ def test_device_interface_array(): pa.Array._import_from_c_device(ptr_array, ptr_schema) -def _batch_copy_to_host(cbatch): - # TODO replace below with copy to device when exposed in python - arrs = [] - for col in cbatch.columns: - buffers = [ - global_context.foreign_buffer(buf.address, buf.size, buf).copy_to_host() - if buf is not None else None - for buf in col.buffers() - ] - new = pa.Array.from_buffers(col.type, len(col), buffers) - arrs.append(new) - - return pa.RecordBatch.from_arrays(arrs, schema=cbatch.schema) - - def test_device_interface_batch_array(): cffi = pytest.importorskip("pyarrow.cffi") ffi = cffi.ffi @@ -916,7 +889,7 @@ def test_device_interface_batch_array(): del cbatch cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema) assert cbatch_new.schema == schema - batch_new = _batch_copy_to_host(cbatch_new) + batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager()) assert batch_new.equals(batch) del cbatch_new @@ -931,7 +904,7 @@ def test_device_interface_batch_array(): del cbatch cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) assert cbatch_new.schema == schema - batch_new = _batch_copy_to_host(cbatch_new) + batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager()) assert batch_new.equals(batch) del cbatch_new diff --git a/python/pyarrow/tests/test_device.py b/python/pyarrow/tests/test_device.py index 6bdb015be1a..e45fb3e2c60 100644 --- a/python/pyarrow/tests/test_device.py +++ b/python/pyarrow/tests/test_device.py @@ -41,3 +41,16 @@ def test_buffer_device(): assert buf.device.is_cpu assert buf.device == pa.default_cpu_memory_manager().device assert buf.memory_manager.is_cpu + + +def test_copy_to(): + mm = pa.default_cpu_memory_manager() + arr = pa.array([0, 1, 2]) + arr_copied = arr.copy_to(mm) + assert arr.equals(arr_copied) + assert arr.buffers()[1].address != arr_copied.buffers()[1].address + + batch = pa.record_batch({"col": arr}) + batch_copied = batch.copy_to(mm) + assert batch.equals(batch_copied) + assert arr.buffers()[1].address != batch_copied["col"].buffers()[1].address From 576d20d64a823a7ee3231794bd1e1330cdc941d4 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Thu, 8 Aug 2024 18:01:06 +0200 Subject: [PATCH 2/4] also accept device, expand testing --- python/pyarrow/array.pxi | 19 +++++++++++++++---- python/pyarrow/device.pxi | 3 +++ python/pyarrow/lib.pxd | 2 ++ python/pyarrow/table.pxi | 19 +++++++++++++++---- python/pyarrow/tests/test_cuda.py | 29 +++++++++++++++++++---------- python/pyarrow/tests/test_device.py | 27 ++++++++++++++++++++------- 6 files changed, 74 insertions(+), 25 deletions(-) diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 5d8b5bfe698..73bc38d38b2 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1702,10 +1702,10 @@ cdef class Array(_PandasConvertible): _append_array_buffers(self.sp_array.get().data().get(), res) return res - def copy_to(self, MemoryManager memory_manager): + def copy_to(self, destination): """ Construct a copy of the array with all buffers on destination - Memory Manager + device. This method recursively copies the array's buffers and those of its children onto the destination MemoryManager device and returns the @@ -1713,7 +1713,7 @@ cdef class Array(_PandasConvertible): Parameters ---------- - memory_manager : pyarrow.MemoryManager + destination : pyarrow.MemoryManager or pyarrow.Device Returns ------ @@ -1721,9 +1721,20 @@ cdef class Array(_PandasConvertible): """ cdef: shared_ptr[CArray] c_array + shared_ptr[CMemoryManager] c_memory_manager + + if isinstance(destination, Device): + c_memory_manager = (destination).unwrap().get().default_memory_manager() + elif isinstance(destination, MemoryManager): + c_memory_manager = (destination).unwrap() + else: + raise TypeError( + "Argument 'destination' has incorrect type (expected a " + f"pyarrow Device or MemoryManager, got {type(destination)})" + ) with nogil: - c_array = GetResultValue(self.ap.CopyTo(memory_manager.unwrap())) + c_array = GetResultValue(self.ap.CopyTo(c_memory_manager)) return pyarrow_wrap_array(c_array) def _export_to_c(self, out_ptr, out_schema_ptr=0): diff --git a/python/pyarrow/device.pxi b/python/pyarrow/device.pxi index 2710d18a740..26256de6209 100644 --- a/python/pyarrow/device.pxi +++ b/python/pyarrow/device.pxi @@ -64,6 +64,9 @@ cdef class Device(_Weakrefable): self.init(device) return self + cdef inline shared_ptr[CDevice] unwrap(self) nogil: + return self.device + def __eq__(self, other): if not isinstance(other, Device): return False diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index bedcff9559d..99fa909f3da 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -534,6 +534,8 @@ cdef class Device(_Weakrefable): @staticmethod cdef wrap(const shared_ptr[CDevice]& device) + cdef inline shared_ptr[CDevice] unwrap(self) nogil + cdef class MemoryManager(_Weakrefable): cdef: diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 692905a5e43..3723d14b613 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -3569,9 +3569,9 @@ cdef class RecordBatch(_Tabular): row_major, pool)) return pyarrow_wrap_tensor(c_tensor) - def copy_to(self, MemoryManager memory_manager): + def copy_to(self, destination): """ - Copy the entire RecordBatch to destination MemoryManager. + Copy the entire RecordBatch to destination device. This copies each column of the record batch to create a new record batch where all underlying buffers for the columns have @@ -3579,7 +3579,7 @@ cdef class RecordBatch(_Tabular): Parameters ---------- - memory_manager : pyarrow.MemoryManager + destination : pyarrow.MemoryManager or pyarrow.Device Returns ------ @@ -3587,9 +3587,20 @@ cdef class RecordBatch(_Tabular): """ cdef: shared_ptr[CRecordBatch] c_batch + shared_ptr[CMemoryManager] c_memory_manager + + if isinstance(destination, Device): + c_memory_manager = (destination).unwrap().get().default_memory_manager() + elif isinstance(destination, MemoryManager): + c_memory_manager = (destination).unwrap() + else: + raise TypeError( + "Argument 'destination' has incorrect type (expected a " + f"pyarrow Device or MemoryManager, got {type(destination)})" + ) with nogil: - c_batch = GetResultValue(self.batch.CopyTo(memory_manager.unwrap())) + c_batch = GetResultValue(self.batch.CopyTo(c_memory_manager)) return pyarrow_wrap_batch(c_batch) def _export_to_c(self, out_ptr, out_schema_ptr=0): diff --git a/python/pyarrow/tests/test_cuda.py b/python/pyarrow/tests/test_cuda.py index bf4062bb73a..568f94b6d74 100644 --- a/python/pyarrow/tests/test_cuda.py +++ b/python/pyarrow/tests/test_cuda.py @@ -809,16 +809,25 @@ def test_copy_to(): _, buf = make_random_buffer(size=10, target='device') mm_cuda = buf.memory_manager - arr = pa.array([0, 1, 2]) - arr_cuda = arr.copy_to(mm_cuda) - assert not arr_cuda.buffers()[1].is_cpu - assert arr_cuda.buffers()[1].device_type == pa.DeviceAllocationType.CUDA - - batch = pa.record_batch({"col": arr}) - batch_cuda = batch.copy_to(mm_cuda) - buf_cuda = batch_cuda["col"].buffers()[1] - assert not buf_cuda.is_cpu - assert buf_cuda.device_type == pa.DeviceAllocationType.CUDA + for dest in [mm_cuda, mm_cuda.device]: + arr = pa.array([0, 1, 2]) + arr_cuda = arr.copy_to(dest) + assert not arr_cuda.buffers()[1].is_cpu + assert arr_cuda.buffers()[1].device_type == pa.DeviceAllocationType.CUDA + assert arr_cuda.buffers()[1].device == mm_cuda.device + + arr_roundtrip = arr_cuda.copy_to(pa.default_cpu_memory_manager()) + assert arr_roundtrip.equals(arr) + + batch = pa.record_batch({"col": arr}) + batch_cuda = batch.copy_to(dest) + buf_cuda = batch_cuda["col"].buffers()[1] + assert not buf_cuda.is_cpu + assert buf_cuda.device_type == pa.DeviceAllocationType.CUDA + assert buf_cuda.device == mm_cuda.device + + batch_roundtrip = batch_cuda.copy_to(pa.default_cpu_memory_manager()) + assert batch_roundtrip.equals(batch) def test_device_interface_array(): diff --git a/python/pyarrow/tests/test_device.py b/python/pyarrow/tests/test_device.py index e45fb3e2c60..dc1a51e6d00 100644 --- a/python/pyarrow/tests/test_device.py +++ b/python/pyarrow/tests/test_device.py @@ -17,6 +17,8 @@ import pyarrow as pa +import pytest + def test_device_memory_manager(): mm = pa.default_cpu_memory_manager() @@ -45,12 +47,23 @@ def test_buffer_device(): def test_copy_to(): mm = pa.default_cpu_memory_manager() - arr = pa.array([0, 1, 2]) - arr_copied = arr.copy_to(mm) - assert arr.equals(arr_copied) - assert arr.buffers()[1].address != arr_copied.buffers()[1].address + arr = pa.array([0, 1, 2]) batch = pa.record_batch({"col": arr}) - batch_copied = batch.copy_to(mm) - assert batch.equals(batch_copied) - assert arr.buffers()[1].address != batch_copied["col"].buffers()[1].address + + for dest in [mm, mm.device]: + arr_copied = arr.copy_to(dest) + assert arr_copied.equals(arr) + assert arr_copied.buffers()[1].device == mm.device + assert arr_copied.buffers()[1].address != arr.buffers()[1].address + + batch_copied = batch.copy_to(dest) + assert batch_copied.equals(batch) + assert batch_copied["col"].buffers()[1].device == mm.device + assert batch_copied["col"].buffers()[1].address != arr.buffers()[1].address + + with pytest.raises(TypeError, match="Argument 'destination' has incorrect type"): + arr.copy_to(mm.device.device_type) + + with pytest.raises(TypeError, match="Argument 'destination' has incorrect type"): + batch.copy_to(mm.device.device_type) From edfb00fe06166529efcf506cd3d2d7706aeff0e6 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Mon, 12 Aug 2024 17:01:48 +0200 Subject: [PATCH 3/4] fix docstring linting --- python/pyarrow/array.pxi | 1 + python/pyarrow/table.pxi | 1 + 2 files changed, 2 insertions(+) diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 73bc38d38b2..594ebb2604c 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1714,6 +1714,7 @@ cdef class Array(_PandasConvertible): Parameters ---------- destination : pyarrow.MemoryManager or pyarrow.Device + The destination device to copy the array to. Returns ------ diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 3723d14b613..2d72bd117d1 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -3580,6 +3580,7 @@ cdef class RecordBatch(_Tabular): Parameters ---------- destination : pyarrow.MemoryManager or pyarrow.Device + The destination device to copy the array to. Returns ------ From 31edba45c32167f966035c0170f4c58711b9405d Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 14 Aug 2024 17:58:34 +0200 Subject: [PATCH 4/4] fix docstring linting --- python/pyarrow/array.pxi | 2 +- python/pyarrow/table.pxi | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index cbac3907fe9..850378d2403 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1717,7 +1717,7 @@ cdef class Array(_PandasConvertible): The destination device to copy the array to. Returns - ------ + ------- Array """ cdef: diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 9f895414d99..6d34c71c9df 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -3583,7 +3583,7 @@ cdef class RecordBatch(_Tabular): The destination device to copy the array to. Returns - ------ + ------- RecordBatch """ cdef: