diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index c2346750a19..8e6922a912a 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -983,6 +983,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CResult[vector[shared_ptr[CChunkedArray]]] Flatten(CMemoryPool* pool) + c_bool is_cpu() const + CStatus Validate() const CStatus ValidateFull() const diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index ad05ea31c91..1caf58e20e6 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -506,6 +506,8 @@ cdef class ChunkedArray(_PandasConvertible): cdef: shared_ptr[CChunkedArray] sp_chunked_array CChunkedArray* chunked_array + c_bool _is_cpu + c_bool _init_is_cpu cdef readonly: # To allow Table to propagate metadata to pandas.Series diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 9bb86236659..3b0df981e01 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -59,6 +59,7 @@ cdef class ChunkedArray(_PandasConvertible): def __cinit__(self): self.chunked_array = NULL + self._init_is_cpu = False def __init__(self): raise TypeError("Do not call ChunkedArray's constructor directly, use " @@ -69,6 +70,7 @@ cdef class ChunkedArray(_PandasConvertible): self.chunked_array = chunked_array.get() def __reduce__(self): + self._assert_cpu() return chunked_array, (self.chunks, self.type) @property @@ -198,6 +200,7 @@ cdef class ChunkedArray(_PandasConvertible): ArrowInvalid """ if full: + self._assert_cpu() with nogil: check_status(self.sp_chunked_array.get().ValidateFull()) else: @@ -220,6 +223,7 @@ cdef class ChunkedArray(_PandasConvertible): >>> n_legs.null_count 1 """ + self._assert_cpu() return self.chunked_array.null_count() @property @@ -245,6 +249,7 @@ cdef class ChunkedArray(_PandasConvertible): >>> n_legs.nbytes 49 """ + self._assert_cpu() cdef: CResult[int64_t] c_res_buffer @@ -271,6 +276,7 @@ cdef class ChunkedArray(_PandasConvertible): >>> n_legs.get_total_buffer_size() 49 """ + self._assert_cpu() cdef: int64_t total_buffer_size @@ -299,13 +305,14 @@ cdef class ChunkedArray(_PandasConvertible): ------- value : Scalar (index) or ChunkedArray (slice) """ - + self._assert_cpu() if isinstance(key, slice): return _normalize_slice(self, key) return self.getitem(_normalize_index(key, self.chunked_array.length())) cdef getitem(self, int64_t i): + self._assert_cpu() return Scalar.wrap(GetResultValue(self.chunked_array.GetScalar(i))) def is_null(self, *, nan_is_null=False): @@ -338,6 +345,7 @@ cdef class ChunkedArray(_PandasConvertible): ] ] """ + self._assert_cpu() options = _pc().NullOptions(nan_is_null=nan_is_null) return _pc().call_function('is_null', [self], options) @@ -363,6 +371,7 @@ cdef class ChunkedArray(_PandasConvertible): ] ] """ + self._assert_cpu() return _pc().is_nan(self) def is_valid(self): @@ -388,6 +397,7 @@ cdef class ChunkedArray(_PandasConvertible): ] ] """ + self._assert_cpu() return _pc().is_valid(self) def __eq__(self, other): @@ -430,6 +440,7 @@ cdef class ChunkedArray(_PandasConvertible): ] ] """ + self._assert_cpu() return _pc().fill_null(self, fill_value) def equals(self, ChunkedArray other): @@ -458,6 +469,7 @@ cdef class ChunkedArray(_PandasConvertible): >>> n_legs.equals(animals) False """ + self._assert_cpu() if other is None: return False @@ -472,6 +484,7 @@ cdef class ChunkedArray(_PandasConvertible): return result def _to_pandas(self, options, types_mapper=None, **kwargs): + self._assert_cpu() return _array_like_to_pandas(self, options, types_mapper=types_mapper) def to_numpy(self, zero_copy_only=False): @@ -495,6 +508,7 @@ cdef class ChunkedArray(_PandasConvertible): >>> n_legs.to_numpy() array([ 2, 2, 4, 4, 5, 100]) """ + self._assert_cpu() if np is None: raise ImportError( "Cannot return a numpy.ndarray if NumPy is not present") @@ -529,6 +543,7 @@ cdef class ChunkedArray(_PandasConvertible): return values def __array__(self, dtype=None, copy=None): + self._assert_cpu() if copy is False: raise ValueError( "Unable to avoid a copy while creating a numpy array as requested " @@ -574,6 +589,7 @@ cdef class ChunkedArray(_PandasConvertible): >>> n_legs_seconds.type DurationType(duration[s]) """ + self._assert_cpu() return _pc().cast(self, target_type, safe=safe, options=options) def dictionary_encode(self, null_encoding='mask'): @@ -636,6 +652,7 @@ cdef class ChunkedArray(_PandasConvertible): ] ] """ + self._assert_cpu() options = _pc().DictionaryEncodeOptions(null_encoding) return _pc().call_function('dictionary_encode', [self], options) @@ -700,6 +717,7 @@ cdef class ChunkedArray(_PandasConvertible): >>> n_legs.type DataType(int64) """ + self._assert_cpu() cdef: vector[shared_ptr[CChunkedArray]] flattened CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) @@ -751,6 +769,7 @@ cdef class ChunkedArray(_PandasConvertible): 100 ] """ + self._assert_cpu() if self.num_chunks == 0: return array([], type=self.type) else: @@ -791,6 +810,7 @@ cdef class ChunkedArray(_PandasConvertible): 100 ] """ + self._assert_cpu() return _pc().call_function('unique', [self]) def value_counts(self): @@ -837,6 +857,7 @@ cdef class ChunkedArray(_PandasConvertible): 1 ] """ + self._assert_cpu() return _pc().call_function('value_counts', [self]) def slice(self, offset=0, length=None): @@ -959,6 +980,7 @@ cdef class ChunkedArray(_PandasConvertible): ] ] """ + self._assert_cpu() return _pc().filter(self, mask, null_selection_behavior) def index(self, value, start=None, end=None, *, memory_pool=None): @@ -1006,6 +1028,7 @@ cdef class ChunkedArray(_PandasConvertible): >>> n_legs.index(4, start=3) """ + self._assert_cpu() return _pc().index(self, value, start, end, memory_pool=memory_pool) def take(self, object indices): @@ -1052,6 +1075,7 @@ cdef class ChunkedArray(_PandasConvertible): ] ] """ + self._assert_cpu() return _pc().take(self, indices) def drop_null(self): @@ -1091,6 +1115,7 @@ cdef class ChunkedArray(_PandasConvertible): ] ] """ + self._assert_cpu() return _pc().drop_null(self) def sort(self, order="ascending", **kwargs): @@ -1110,6 +1135,7 @@ cdef class ChunkedArray(_PandasConvertible): ------- result : ChunkedArray """ + self._assert_cpu() indices = _pc().sort_indices( self, options=_pc().SortOptions(sort_keys=[("", order)], **kwargs) @@ -1209,6 +1235,7 @@ cdef class ChunkedArray(_PandasConvertible): ] ] """ + self._assert_cpu() cdef: CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) shared_ptr[CChunkedArray] c_result @@ -1333,6 +1360,7 @@ cdef class ChunkedArray(_PandasConvertible): >>> n_legs.to_pylist() [2, 2, 4, 4, None, 100] """ + self._assert_cpu() result = [] for i in range(self.num_chunks): result += self.chunk(i).to_pylist() @@ -1354,6 +1382,7 @@ cdef class ChunkedArray(_PandasConvertible): PyCapsule A capsule containing a C ArrowArrayStream struct. """ + self._assert_cpu() cdef: ChunkedArray chunked ArrowArrayStream* c_stream = NULL @@ -1410,6 +1439,20 @@ cdef class ChunkedArray(_PandasConvertible): self.init(c_chunked_array) return self + @property + def is_cpu(self): + """ + Whether all chunks in the ChunkedArray are CPU-accessible. + """ + if not self._init_is_cpu: + self._is_cpu = self.chunked_array.is_cpu() + self._init_is_cpu = True + return self._is_cpu + + def _assert_cpu(self): + if not self.is_cpu: + raise NotImplementedError("Implemented only for data on CPU device") + def chunked_array(arrays, type=None): """ diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 57765985505..c3f805b4b32 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -3385,13 +3385,13 @@ def cuda_context(): @pytest.fixture def schema(): - return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())]) + return pa.schema([pa.field('c0', pa.int32()), pa.field('c1', pa.int32())]) @pytest.fixture -def cpu_arrays(): - return [pa.array([1, 2, 3, 4, 5], pa.int16()), - pa.array([-10, -5, 0, 1, 10], pa.int32())] +def cpu_arrays(schema): + return [pa.array([1, 2, 3, 4, 5], schema.field(0).type), + pa.array([-10, -5, 0, None, 10], schema.field(1).type)] @pytest.fixture @@ -3399,6 +3399,27 @@ def cuda_arrays(cuda_context, cpu_arrays): return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays] +@pytest.fixture +def cpu_chunked_array(cpu_arrays): + chunked_array = pa.chunked_array(cpu_arrays) + assert chunked_array.is_cpu is True + return chunked_array + + +@pytest.fixture +def cuda_chunked_array(cuda_arrays): + chunked_array = pa.chunked_array(cuda_arrays) + assert chunked_array.is_cpu is False + return chunked_array + + +@pytest.fixture +def cpu_and_cuda_chunked_array(cpu_arrays, cuda_arrays): + chunked_array = pa.chunked_array(cpu_arrays + cuda_arrays) + assert chunked_array.is_cpu is False + return chunked_array + + @pytest.fixture def cpu_recordbatch(cpu_arrays, schema): return pa.record_batch(cpu_arrays, schema=schema) @@ -3409,6 +3430,147 @@ def cuda_recordbatch(cuda_context, cpu_recordbatch): return cpu_recordbatch.copy_to(cuda_context.memory_manager) +def test_chunked_array_non_cpu(cuda_context, cpu_chunked_array, cuda_chunked_array, + cpu_and_cuda_chunked_array): + # type test + assert cuda_chunked_array.type == cpu_chunked_array.type + + # length() test + assert cuda_chunked_array.length() == cpu_chunked_array.length() + + # str() test + assert str(cuda_chunked_array) == str(cpu_chunked_array) + + # repr() test + assert str(cuda_chunked_array) in repr(cuda_chunked_array) + + # validate() test + cuda_chunked_array.validate() + with pytest.raises(NotImplementedError): + cuda_chunked_array.validate(full=True) + + # null_count test + with pytest.raises(NotImplementedError): + cuda_chunked_array.null_count + + # nbytes() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.nbytes + + # get_total_buffer_size() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.get_total_buffer_size() + + # getitem() test + with pytest.raises(NotImplementedError): + cuda_chunked_array[0] + + # is_null() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.is_null() + + # is_nan() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.is_nan() + + # is_valid() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.is_valid() + + # fill_null() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.fill_null(0) + + # equals() test + with pytest.raises(NotImplementedError): + cuda_chunked_array == cuda_chunked_array + + # to_pandas() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.to_pandas() + + # to_numpy() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.to_numpy() + + # __array__() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.__array__() + + # cast() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.cast() + + # dictionary_encode() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.dictionary_encode() + + # flatten() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.flatten() + + # combine_chunks() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.combine_chunks() + + # unique() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.unique() + + # value_counts() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.value_counts() + + # filter() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.filter([True, False, True, False, True]) + + # index() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.index(5) + + # slice() test + cuda_chunked_array.slice(2, 2) + + # take() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.take([1]) + + # drop_null() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.drop_null() + + # sort() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.sort() + + # unify_dictionaries() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.unify_dictionaries() + + # num_chunks test + assert cuda_chunked_array.num_chunks == cpu_chunked_array.num_chunks + + # chunks test + assert len(cuda_chunked_array.chunks) == len(cpu_chunked_array.chunks) + + # chunk() test + chunk = cuda_chunked_array.chunk(0) + assert chunk.device_type == pa.DeviceAllocationType.CUDA + + # to_pylist() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.to_pylist() + + # __arrow_c_stream__() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.__arrow_c_stream__() + + # __reduce__() test + with pytest.raises(NotImplementedError): + cuda_chunked_array.__reduce__() + + def verify_cuda_recordbatch(batch, expected_schema): batch.validate() assert batch.device_type == pa.DeviceAllocationType.CUDA @@ -3480,8 +3642,8 @@ def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch, cuda_recordbatch.sort_by('c0') # field() test - assert cuda_recordbatch.field(0) == pa.field('c0', pa.int16()) - assert cuda_recordbatch.field(1) == pa.field('c1', pa.int32()) + assert cuda_recordbatch.field(0) == schema.field(0) + assert cuda_recordbatch.field(1) == schema.field(1) # equals() test new_batch = cpu_recordbatch.copy_to(cuda_context.memory_manager) @@ -3551,7 +3713,8 @@ def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch, # rename_columns() test new_batch = cuda_recordbatch.rename_columns(['col0', 'col1']) expected_schema = pa.schema( - [pa.field('col0', pa.int16()), pa.field('col1', pa.int32())]) + [pa.field('col0', schema.field(0).type), + pa.field('col1', schema.field(1).type)]) verify_cuda_recordbatch(new_batch, expected_schema=expected_schema) # validate() test