Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:

CResult[vector[shared_ptr[CChunkedArray]]] Flatten(CMemoryPool* pool)

c_bool is_cpu() const

CStatus Validate() const
CStatus ValidateFull() const

Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ cdef class ChunkedArray(_PandasConvertible):
cdef:
shared_ptr[CChunkedArray] sp_chunked_array
CChunkedArray* chunked_array
c_bool _is_cpu
c_bool _init_is_cpu

cdef readonly:
# To allow Table to propagate metadata to pandas.Series
Expand Down
45 changes: 44 additions & 1 deletion python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ cdef class ChunkedArray(_PandasConvertible):

def __cinit__(self):
self.chunked_array = NULL
self._init_is_cpu = False

def __init__(self):
raise TypeError("Do not call ChunkedArray's constructor directly, use "
Expand All @@ -69,6 +70,7 @@ cdef class ChunkedArray(_PandasConvertible):
self.chunked_array = chunked_array.get()

def __reduce__(self):
self._assert_cpu()
return chunked_array, (self.chunks, self.type)

@property
Expand Down Expand Up @@ -198,6 +200,7 @@ cdef class ChunkedArray(_PandasConvertible):
ArrowInvalid
"""
if full:
self._assert_cpu()
with nogil:
check_status(self.sp_chunked_array.get().ValidateFull())
else:
Expand All @@ -220,6 +223,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs.null_count
1
"""
self._assert_cpu()
return self.chunked_array.null_count()

@property
Expand All @@ -245,6 +249,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs.nbytes
49
"""
self._assert_cpu()
cdef:
CResult[int64_t] c_res_buffer

Expand All @@ -271,6 +276,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs.get_total_buffer_size()
49
"""
self._assert_cpu()
cdef:
int64_t total_buffer_size

Expand Down Expand Up @@ -299,13 +305,14 @@ cdef class ChunkedArray(_PandasConvertible):
-------
value : Scalar (index) or ChunkedArray (slice)
"""

self._assert_cpu()
if isinstance(key, slice):
return _normalize_slice(self, key)

return self.getitem(_normalize_index(key, self.chunked_array.length()))

cdef getitem(self, int64_t i):
self._assert_cpu()
return Scalar.wrap(GetResultValue(self.chunked_array.GetScalar(i)))

def is_null(self, *, nan_is_null=False):
Expand Down Expand Up @@ -338,6 +345,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
options = _pc().NullOptions(nan_is_null=nan_is_null)
return _pc().call_function('is_null', [self], options)

Expand All @@ -363,6 +371,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
return _pc().is_nan(self)

def is_valid(self):
Expand All @@ -388,6 +397,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
return _pc().is_valid(self)

def __eq__(self, other):
Expand Down Expand Up @@ -430,6 +440,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
return _pc().fill_null(self, fill_value)

def equals(self, ChunkedArray other):
Expand Down Expand Up @@ -458,6 +469,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs.equals(animals)
False
"""
self._assert_cpu()
if other is None:
return False

Expand All @@ -472,6 +484,7 @@ cdef class ChunkedArray(_PandasConvertible):
return result

def _to_pandas(self, options, types_mapper=None, **kwargs):
self._assert_cpu()
return _array_like_to_pandas(self, options, types_mapper=types_mapper)

def to_numpy(self, zero_copy_only=False):
Expand All @@ -495,6 +508,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs.to_numpy()
array([ 2, 2, 4, 4, 5, 100])
"""
self._assert_cpu()
if np is None:
raise ImportError(
"Cannot return a numpy.ndarray if NumPy is not present")
Expand Down Expand Up @@ -529,6 +543,7 @@ cdef class ChunkedArray(_PandasConvertible):
return values

def __array__(self, dtype=None, copy=None):
self._assert_cpu()
if copy is False:
raise ValueError(
"Unable to avoid a copy while creating a numpy array as requested "
Expand Down Expand Up @@ -574,6 +589,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs_seconds.type
DurationType(duration[s])
"""
self._assert_cpu()
return _pc().cast(self, target_type, safe=safe, options=options)

def dictionary_encode(self, null_encoding='mask'):
Expand Down Expand Up @@ -636,6 +652,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
options = _pc().DictionaryEncodeOptions(null_encoding)
return _pc().call_function('dictionary_encode', [self], options)

Expand Down Expand Up @@ -700,6 +717,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs.type
DataType(int64)
"""
self._assert_cpu()
cdef:
vector[shared_ptr[CChunkedArray]] flattened
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
Expand Down Expand Up @@ -751,6 +769,7 @@ cdef class ChunkedArray(_PandasConvertible):
100
]
"""
self._assert_cpu()
if self.num_chunks == 0:
return array([], type=self.type)
else:
Expand Down Expand Up @@ -791,6 +810,7 @@ cdef class ChunkedArray(_PandasConvertible):
100
]
"""
self._assert_cpu()
return _pc().call_function('unique', [self])

def value_counts(self):
Expand Down Expand Up @@ -837,6 +857,7 @@ cdef class ChunkedArray(_PandasConvertible):
1
]
"""
self._assert_cpu()
return _pc().call_function('value_counts', [self])

def slice(self, offset=0, length=None):
Expand Down Expand Up @@ -959,6 +980,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
return _pc().filter(self, mask, null_selection_behavior)

def index(self, value, start=None, end=None, *, memory_pool=None):
Expand Down Expand Up @@ -1006,6 +1028,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs.index(4, start=3)
<pyarrow.Int64Scalar: 3>
"""
self._assert_cpu()
return _pc().index(self, value, start, end, memory_pool=memory_pool)

def take(self, object indices):
Expand Down Expand Up @@ -1052,6 +1075,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
return _pc().take(self, indices)

def drop_null(self):
Expand Down Expand Up @@ -1091,6 +1115,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
return _pc().drop_null(self)

def sort(self, order="ascending", **kwargs):
Expand All @@ -1110,6 +1135,7 @@ cdef class ChunkedArray(_PandasConvertible):
-------
result : ChunkedArray
"""
self._assert_cpu()
indices = _pc().sort_indices(
self,
options=_pc().SortOptions(sort_keys=[("", order)], **kwargs)
Expand Down Expand Up @@ -1209,6 +1235,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
cdef:
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
shared_ptr[CChunkedArray] c_result
Expand Down Expand Up @@ -1333,6 +1360,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs.to_pylist()
[2, 2, 4, 4, None, 100]
"""
self._assert_cpu()
result = []
for i in range(self.num_chunks):
result += self.chunk(i).to_pylist()
Expand All @@ -1354,6 +1382,7 @@ cdef class ChunkedArray(_PandasConvertible):
PyCapsule
A capsule containing a C ArrowArrayStream struct.
"""
self._assert_cpu()
cdef:
ChunkedArray chunked
ArrowArrayStream* c_stream = NULL
Expand Down Expand Up @@ -1410,6 +1439,20 @@ cdef class ChunkedArray(_PandasConvertible):
self.init(c_chunked_array)
return self

@property
def is_cpu(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An is_cpu predicate on chunked arrays can be defined without us forcing a single device type for chunked arrays. This would unblock the Python checks without ruling out the possibility of arrays with mixed device allocations.

"""
Whether all chunks in the ChunkedArray are CPU-accessible.
"""
if not self._init_is_cpu:
self._is_cpu = self.chunked_array.is_cpu()
self._init_is_cpu = True
return self._is_cpu

def _assert_cpu(self):
if not self.is_cpu:
raise NotImplementedError("Implemented only for data on CPU device")


def chunked_array(arrays, type=None):
"""
Expand Down
Loading