Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 62 additions & 8 deletions python/pyarrow/array.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,7 @@ cdef class Array(_PandasConvertible):
+"two-and-a-half"

"""
self._assert_cpu()
cdef c_string result
with nogil:
result = self.ap.Diff(deref(other.ap))
Expand All @@ -982,6 +983,7 @@ cdef class Array(_PandasConvertible):
-------
cast : Array
"""
self._assert_cpu()
return _pc().cast(self, target_type, safe=safe,
options=options, memory_pool=memory_pool)

Expand All @@ -1000,6 +1002,7 @@ cdef class Array(_PandasConvertible):
-------
view : Array
"""
self._assert_cpu()
cdef DataType type = ensure_type(target_type)
cdef shared_ptr[CArray] result
with nogil:
Expand All @@ -1022,6 +1025,7 @@ cdef class Array(_PandasConvertible):
sum : Scalar
A scalar containing the sum value.
"""
self._assert_cpu()
options = _pc().ScalarAggregateOptions(**kwargs)
return _pc().call_function('sum', [self], options)

Expand All @@ -1034,6 +1038,7 @@ cdef class Array(_PandasConvertible):
unique : Array
An array of the same data type, with deduplicated elements.
"""
self._assert_cpu()
return _pc().call_function('unique', [self])

def dictionary_encode(self, null_encoding='mask'):
Expand All @@ -1052,6 +1057,7 @@ cdef class Array(_PandasConvertible):
encoded : DictionaryArray
A dictionary-encoded version of this array.
"""
self._assert_cpu()
options = _pc().DictionaryEncodeOptions(null_encoding)
return _pc().call_function('dictionary_encode', [self], options)

Expand All @@ -1064,6 +1070,7 @@ cdef class Array(_PandasConvertible):
StructArray
An array of <input type "Values", int64 "Counts"> structs
"""
self._assert_cpu()
return _pc().call_function('value_counts', [self])

@staticmethod
Expand Down Expand Up @@ -1105,6 +1112,7 @@ cdef class Array(_PandasConvertible):
memory_pool=memory_pool)

def __reduce__(self):
self._assert_cpu()
return _restore_array, \
(_reduce_array_data(self.sp_array.get().data().get()),)

Expand Down Expand Up @@ -1172,6 +1180,7 @@ cdef class Array(_PandasConvertible):

@property
def null_count(self):
self._assert_cpu()
return self.sp_array.get().null_count()

@property
Expand All @@ -1191,9 +1200,8 @@ cdef class Array(_PandasConvertible):
The dictionary of dictionary arrays will always be counted in their
entirety even if the array only references a portion of the dictionary.
"""
cdef:
CResult[int64_t] c_size_res

self._assert_cpu()
cdef CResult[int64_t] c_size_res
with nogil:
c_size_res = ReferencedBufferSize(deref(self.ap))
size = GetResultValue(c_size_res)
Expand All @@ -1210,16 +1218,17 @@ cdef class Array(_PandasConvertible):
If a buffer is referenced multiple times then it will
only be counted once.
"""
cdef:
int64_t total_buffer_size

self._assert_cpu()
cdef int64_t total_buffer_size
total_buffer_size = TotalBufferSize(deref(self.ap))
Comment on lines +1221 to 1223
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My first thought was that getting the buffer size should be possible without looking at the actual data. But so it seems that to avoid counting identical buffers twice (if they are reused in a single array, which is possible), it uses the buffer's address to distinguish buffers. Currently that uses buffer->data() in DoTotalBufferSize. data() will return null for non-cpu data, but since it doesn't actually use that address, I think this could also use buffer->address() which will return the address always even for non-cpu data.

(but that could be fine for a follow-up as well)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That checks out! I tested it locally and witnessed a segfault, but hadn't dug into the reasoning.

return total_buffer_size

def __sizeof__(self):
self._assert_cpu()
return super(Array, self).__sizeof__() + self.nbytes

def __iter__(self):
self._assert_cpu()
for i in range(len(self)):
yield self.getitem(i)

Expand Down Expand Up @@ -1252,6 +1261,8 @@ cdef class Array(_PandasConvertible):
If the array should be rendered as a single line of text
or if each element should be on its own line.
"""
self._assert_cpu()

cdef:
c_string result
PrettyPrintOptions options
Expand Down Expand Up @@ -1307,6 +1318,8 @@ cdef class Array(_PandasConvertible):
-------
bool
"""
self._assert_cpu()
other._assert_cpu()
return self.ap.Equals(deref(other.ap))

def __len__(self):
Expand All @@ -1331,6 +1344,7 @@ cdef class Array(_PandasConvertible):
-------
array : boolean Array
"""
self._assert_cpu()
options = _pc().NullOptions(nan_is_null=nan_is_null)
return _pc().call_function('is_null', [self], options)

Expand All @@ -1342,12 +1356,14 @@ cdef class Array(_PandasConvertible):
-------
array : boolean Array
"""
self._assert_cpu()
return _pc().call_function('is_nan', [self])

def is_valid(self):
"""
Return BooleanArray indicating the non-null values.
"""
self._assert_cpu()
return _pc().is_valid(self)

def fill_null(self, fill_value):
Expand All @@ -1364,6 +1380,7 @@ cdef class Array(_PandasConvertible):
result : Array
A new array with nulls replaced by the given value.
"""
self._assert_cpu()
return _pc().fill_null(self, fill_value)

def __getitem__(self, key):
Expand All @@ -1380,12 +1397,14 @@ cdef class Array(_PandasConvertible):
-------
value : Scalar (index) or Array (slice)
"""
self._assert_cpu()
if isinstance(key, slice):
return _normalize_slice(self, key)

return self.getitem(_normalize_index(key, self.length()))

cdef getitem(self, int64_t i):
self._assert_cpu()
return Scalar.wrap(GetResultValue(self.ap.GetScalar(i)))

def slice(self, offset=0, length=None):
Expand All @@ -1404,8 +1423,7 @@ cdef class Array(_PandasConvertible):
-------
sliced : RecordBatch
"""
cdef:
shared_ptr[CArray] result
cdef shared_ptr[CArray] result

if offset < 0:
raise IndexError('Offset must be non-negative')
Expand Down Expand Up @@ -1436,12 +1454,14 @@ cdef class Array(_PandasConvertible):
taken : Array
An array with the same datatype, containing the taken values.
"""
self._assert_cpu()
return _pc().take(self, indices)

def drop_null(self):
"""
Remove missing values from an array.
"""
self._assert_cpu()
return _pc().drop_null(self)

def filter(self, object mask, *, null_selection_behavior='drop'):
Expand All @@ -1463,6 +1483,7 @@ cdef class Array(_PandasConvertible):
An array of the same type, with only the elements selected by
the boolean mask.
"""
self._assert_cpu()
return _pc().filter(self, mask,
null_selection_behavior=null_selection_behavior)

Expand All @@ -1488,6 +1509,7 @@ cdef class Array(_PandasConvertible):
index : Int64Scalar
The index of the value in the array (-1 if not found).
"""
self._assert_cpu()
return _pc().index(self, value, start, end, memory_pool=memory_pool)

def sort(self, order="ascending", **kwargs):
Expand All @@ -1507,16 +1529,20 @@ cdef class Array(_PandasConvertible):
-------
result : Array
"""
self._assert_cpu()
indices = _pc().sort_indices(
self,
options=_pc().SortOptions(sort_keys=[("", order)], **kwargs)
)
return self.take(indices)

def _to_pandas(self, options, types_mapper=None, **kwargs):
self._assert_cpu()
return _array_like_to_pandas(self, options, types_mapper=types_mapper)

def __array__(self, dtype=None, copy=None):
self._assert_cpu()

if copy is False:
try:
values = self.to_numpy(zero_copy_only=True)
Expand Down Expand Up @@ -1566,6 +1592,8 @@ cdef class Array(_PandasConvertible):
-------
array : numpy.ndarray
"""
self._assert_cpu()

cdef:
PyObject* out
PandasOptions c_options
Expand Down Expand Up @@ -1604,6 +1632,7 @@ cdef class Array(_PandasConvertible):
-------
lst : list
"""
self._assert_cpu()
return [x.as_py() for x in self]

def tolist(self):
Expand All @@ -1629,6 +1658,7 @@ cdef class Array(_PandasConvertible):
ArrowInvalid
"""
if full:
self._assert_cpu()
with nogil:
check_status(self.ap.ValidateFull())
else:
Expand Down Expand Up @@ -1737,6 +1767,8 @@ cdef class Array(_PandasConvertible):
A pair of PyCapsules containing a C ArrowSchema and ArrowArray,
respectively.
"""
self._assert_cpu()

cdef:
ArrowArray* c_array
ArrowSchema* c_schema
Expand Down Expand Up @@ -1885,6 +1917,28 @@ cdef class Array(_PandasConvertible):
device = GetResultValue(ExportDevice(self.sp_array))
return device.device_type, device.device_id

@property
def device_type(self):
"""
The device type where the array resides.

Returns
-------
DeviceAllocationType
"""
return _wrap_device_allocation_type(self.sp_array.get().device_type())

@property
def is_cpu(self):
"""
Whether the array is CPU-accessible.
"""
return self.device_type == DeviceAllocationType.CPU

cdef void _assert_cpu(self) except *:
if self.sp_array.get().device_type() != CDeviceAllocationType_kCPU:
raise NotImplementedError("Implemented only for data on CPU device")


cdef _array_like_to_pandas(obj, options, types_mapper):
cdef:
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
CStatus Validate() const
CStatus ValidateFull() const
CResult[shared_ptr[CArray]] View(const shared_ptr[CDataType]& type)
CDeviceAllocationType device_type()

shared_ptr[CArray] MakeArray(const shared_ptr[CArrayData]& data)
CResult[shared_ptr[CArray]] MakeArrayOfNull(
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ cdef class Array(_PandasConvertible):
cdef void init(self, const shared_ptr[CArray]& sp_array) except *
cdef getitem(self, int64_t i)
cdef int64_t length(self)
cdef void _assert_cpu(self) except *


cdef class Tensor(_Weakrefable):
Expand Down
Loading