diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc index 83eeb56c496..85807eb3685 100644 --- a/cpp/src/arrow/array/data.cc +++ b/cpp/src/arrow/array/data.cc @@ -68,6 +68,15 @@ static inline void AdjustNonNullable(Type::type type_id, int64_t length, namespace internal { +void AdjustNonNullable(ArrayData* array_data) { + int64_t null_count = kUnknownNullCount; + AdjustNonNullable(array_data->type->id(), array_data->length, &array_data->buffers, + &null_count); + if (null_count != kUnknownNullCount) { + array_data->null_count.store(null_count, std::memory_order_release); + } +} + bool IsNullSparseUnion(const ArrayData& data, int64_t i) { auto* union_type = checked_cast(data.type.get()); const auto* types = reinterpret_cast(data.buffers[1]->data()); diff --git a/cpp/src/arrow/array/data.h b/cpp/src/arrow/array/data.h index e0508fe6980..4f30ebf24cb 100644 --- a/cpp/src/arrow/array/data.h +++ b/cpp/src/arrow/array/data.h @@ -36,6 +36,9 @@ namespace arrow { namespace internal { + +ARROW_EXPORT void AdjustNonNullable(ArrayData* array_data); + // ---------------------------------------------------------------------- // Null handling for types without a validity bitmap and the dictionary type @@ -342,6 +345,17 @@ struct ARROW_EXPORT ArrayData { /// ... /// } bool MayHaveLogicalNulls() const { + if (ARROW_PREDICT_FALSE(device_type() != DeviceAllocationType::kCPU)) { + // Some arrays are malformed in that they have kUnknownNullCount, but no + // validity bitmap and that makes MayHaveLogicalNulls() return true when + // it should return false. + // + // Not a problem if we're on the CPU because ArrayData::GetNullCount() + // will eventually return 0. But it's a problem if we're trying to make + // cheap decisions without fully calculating the null count and want to + // defer the processing of the bitmap to later. + internal::AdjustNonNullable(const_cast(this)); + } if (buffers[0] != NULLPTR) { return null_count.load() != 0; } diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 0478a3d1e80..ae431a45b54 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -102,13 +102,38 @@ Status Function::CheckArity(size_t num_args) const { namespace { Status CheckOptions(const Function& function, const FunctionOptions* options) { - if (options == nullptr && function.doc().options_required) { + if (ARROW_PREDICT_FALSE(options == nullptr && function.doc().options_required)) { return Status::Invalid("Function '", function.name(), "' cannot be called without options"); } return Status::OK(); } +Status BadDeviceTypeStatus(const std::string& func_name, + DeviceAllocationTypeSet expected_device_type_set, + int offending_arg_index, const Datum& offending_arg) { + std::string ordinal; + switch (offending_arg_index) { + case 0: + ordinal = "1st"; + break; + case 1: + ordinal = "2nd"; + break; + case 2: + ordinal = "3rd"; + break; + default: + ordinal = std::to_string(offending_arg_index + 1); + ordinal += "th"; + break; + } + return Status::NotImplemented("'", func_name, "' expects ", ordinal, + " argument's device allocation types(s) to be in ", + expected_device_type_set.ToString(), " but got ", + offending_arg.device_types().ToString(), "."); +} + } // namespace namespace detail { @@ -248,6 +273,15 @@ struct FunctionExecutorImpl : public FunctionExecutor { } args_with_cast[i] = std::move(arg); } + { + DeviceAllocationTypeSet expected_device_type_set; + int offending_arg_index = 0; + if (ARROW_PREDICT_FALSE(!kernel->signature->MatchesDeviceAllocationTypes( + args_with_cast, &expected_device_type_set, &offending_arg_index))) { + return BadDeviceTypeStatus(func_name, expected_device_type_set, + offending_arg_index, args[offending_arg_index]); + } + } detail::DatumAccumulator listener; diff --git a/cpp/src/arrow/compute/kernel.cc b/cpp/src/arrow/compute/kernel.cc index 5e7461cc52d..9850d81d71f 100644 --- a/cpp/src/arrow/compute/kernel.cc +++ b/cpp/src/arrow/compute/kernel.cc @@ -23,6 +23,7 @@ #include #include "arrow/buffer.h" +#include "arrow/chunked_array.h" #include "arrow/compute/exec.h" #include "arrow/device_allocation_type_set.h" #include "arrow/result.h" @@ -431,6 +432,25 @@ bool InputType::Matches(const Datum& value) const { return Matches(*value.type()); } +bool InputType::MatchesDeviceAllocationType(const Datum& value) const { + DCHECK(Matches(value)); + switch (value.kind()) { + case Datum::NONE: + case Datum::RECORD_BATCH: + case Datum::TABLE: + break; + case Datum::ARRAY: + return accepted_device_types_.contains(value.array()->device_type()); + case Datum::CHUNKED_ARRAY: + return accepted_device_types_.Contains(value.chunked_array()->device_types()); + case Datum::SCALAR: + // Scalars are asssumed as always residing in CPU memory for now. + return accepted_device_types_.contains(DeviceAllocationType::kCPU); + } + DCHECK(false) << "MatchesDeviceAllocationType expects ARRAY, CHUNKED_ARRAY or SCALAR"; + return false; +} + const std::shared_ptr& InputType::type() const { DCHECK_EQ(InputType::EXACT_TYPE, kind_); return type_; @@ -529,6 +549,50 @@ bool KernelSignature::MatchesInputs(const std::vector& types) const return true; } +bool KernelSignature::MatchesDeviceAllocationTypes( + const std::vector& args, DeviceAllocationTypeSet* out_expected_device_types, + int* out_offending_arg_index) const { + DeviceAllocationTypeSet expected_device_types; + int offending_arg_index = 0; + bool matches = true; + if (is_varargs_) { + for (size_t i = 0; i < args.size(); ++i) { + auto& param_type = in_types_[std::min(i, in_types_.size() - 1)]; + DCHECK(param_type.Matches(*args[i].type())); + if (!param_type.MatchesDeviceAllocationType(args[i])) { + matches = false; + expected_device_types = param_type.accepted_device_types(); + offending_arg_index = static_cast(i); + break; + } + } + } else { + DCHECK(args.size() == in_types_.size()); + if (args.size() != in_types_.size()) { + matches = false; + } else { + for (size_t i = 0; i < in_types_.size(); ++i) { + auto& param_type = in_types_[i]; + DCHECK(param_type.Matches(*args[i].type())); + if (!param_type.MatchesDeviceAllocationType(args[i])) { + matches = false; + offending_arg_index = static_cast(i); + expected_device_types = param_type.accepted_device_types(); + break; + } + } + } + } + + if (out_expected_device_types) { + *out_expected_device_types = expected_device_types; + } + if (out_offending_arg_index) { + *out_offending_arg_index = offending_arg_index; + } + return matches; +} + size_t KernelSignature::Hash() const { if (hash_code_ != 0) { return hash_code_; diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index cfa1cd8193f..cef6fdf6f09 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -29,6 +29,7 @@ #include #include "arrow/buffer.h" +#include "arrow/chunked_array.h" // for DeviceAllocationTypeSet #include "arrow/compute/exec.h" #include "arrow/datum.h" #include "arrow/device_allocation_type_set.h" @@ -194,16 +195,37 @@ class ARROW_EXPORT InputType { USE_TYPE_MATCHER }; - /// \brief Accept any value type - InputType() : kind_(ANY_TYPE) {} - - /// \brief Accept an exact value type. + /// \brief Accept any value type of values allocated on the given set of device + /// types. + explicit InputType(DeviceAllocationTypeSet accepted_devices) + : kind_(ANY_TYPE), accepted_device_types_(std::move(accepted_devices)) {} + + /// \brief Accept an exact value type of values allocated on the given set of device + /// types. + InputType(std::shared_ptr type, DeviceAllocationTypeSet accepted_devices) + : kind_(EXACT_TYPE), + type_(std::move(type)), + accepted_device_types_(accepted_devices) {} + + /// \brief Use the passed TypeMatcher to check the type of arguments allocated on the + /// given set of device types. + InputType(std::shared_ptr type_matcher, + DeviceAllocationTypeSet accepted_devices) + : kind_(USE_TYPE_MATCHER), + type_matcher_(std::move(type_matcher)), + accepted_device_types_(std::move(accepted_devices)) {} + + /// \brief Accept any value type of values allocated on the CPU. + InputType() : InputType(DeviceAllocationTypeSet::CpuOnly()) {} + + /// \brief Accept an exact value type of values allocated on the CPU. InputType(std::shared_ptr type) // NOLINT implicit construction - : kind_(EXACT_TYPE), type_(std::move(type)) {} + : InputType(std::move(type), DeviceAllocationTypeSet::CpuOnly()) {} - /// \brief Use the passed TypeMatcher to type check. + /// \brief Use the passed TypeMatcher to type check the type of arguments allocated on + /// the CPU. InputType(std::shared_ptr type_matcher) // NOLINT implicit construction - : kind_(USE_TYPE_MATCHER), type_matcher_(std::move(type_matcher)) {} + : InputType(std::move(type_matcher), DeviceAllocationTypeSet::CpuOnly()) {} /// \brief Match any type with the given Type::type. Uses a TypeMatcher for /// its implementation. @@ -242,6 +264,13 @@ class ARROW_EXPORT InputType { /// \brief Return true if the type matches this InputType bool Matches(const DataType& type) const; + /// \brief Return true if the Datum's device allocation type matches this + /// argument's accepted device allocation type set (and only allows scalar or + /// array-like Datums). + /// + /// \pre Matches(value) == true + bool MatchesDeviceAllocationType(const Datum& value) const; + /// \brief The type matching rule that this InputType uses. Kind kind() const { return kind_; } @@ -255,26 +284,32 @@ class ARROW_EXPORT InputType { /// and will assert in debug builds. const TypeMatcher& type_matcher() const; + /// \brief The device allocation types that are accepted for this input type. + DeviceAllocationTypeSet accepted_device_types() const { return accepted_device_types_; } + private: void CopyInto(const InputType& other) { this->kind_ = other.kind_; this->type_ = other.type_; this->type_matcher_ = other.type_matcher_; + this->accepted_device_types_ = other.accepted_device_types_; } void MoveInto(InputType&& other) { this->kind_ = other.kind_; this->type_ = std::move(other.type_); this->type_matcher_ = std::move(other.type_matcher_); + this->accepted_device_types_ = std::move(other.accepted_device_types_); } Kind kind_; - // For EXACT_TYPE Kind std::shared_ptr type_; - // For USE_TYPE_MATCHER Kind std::shared_ptr type_matcher_; + + // For matching device types. CPU-only by default. + DeviceAllocationTypeSet accepted_device_types_; }; /// \brief Container to capture both exact and input-dependent output types. @@ -368,6 +403,15 @@ class ARROW_EXPORT KernelSignature { /// value descriptors. bool MatchesInputs(const std::vector& types) const; + /// \brief Return true if the signature is compatible with the list of + /// execution arguments regarding device allocation types. + /// + /// \pre MatchesInputs(GetTypes(args)) == true + bool MatchesDeviceAllocationTypes( + const std::vector& args, + DeviceAllocationTypeSet* out_expected_device_types = NULLPTR, + int* offending_arg_index = NULLPTR) const; + /// \brief Returns true if the input types of each signature are /// equal. Well-formed functions should have a deterministic output type /// given input types, but currently it is the responsibility of the diff --git a/cpp/src/arrow/compute/kernels/vector_selection.cc b/cpp/src/arrow/compute/kernels/vector_selection.cc index b265673e23c..0d686e6db3d 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection.cc @@ -54,6 +54,7 @@ using internal::BitBlockCounter; using internal::CheckIndexBounds; using internal::CopyBitmap; using internal::CountSetBits; +using internal::may_have_validity_bitmap; using internal::OptionalBitBlockCounter; using internal::OptionalBitIndexer; @@ -68,32 +69,50 @@ using TakeState = OptionsWrapper; // ---------------------------------------------------------------------- // DropNull Implementation +/// \pre values.MayHaveNulls() +/// \pre may_have_validity_bitmap(values_type_id) std::shared_ptr MakeDropNullFilter(const Array& values) { + DCHECK(may_have_validity_bitmap(values.type()->id())); auto& bitmap_buffer = values.null_bitmap(); + DCHECK(bitmap_buffer); return std::make_shared(values.length(), bitmap_buffer, nullptr, 0, values.offset()); } Result DropNullArray(const std::shared_ptr& values, ExecContext* ctx) { - if (values->null_count() == 0) { + if (!values->data()->MayHaveLogicalNulls()) { return values; } - if (values->null_count() == values->length()) { - return MakeEmptyArray(values->type(), ctx->memory_pool()); - } - if (values->type()->id() == Type::type::NA) { + if (ARROW_PREDICT_FALSE(values->type()->id() == Type::type::NA)) { return std::make_shared(0); } + if (values->device_type() == DeviceAllocationType::kCPU) { + const auto null_count = values->null_count(); + if (null_count == 0) { + return values; + } + if (null_count == values->length()) { + return MakeEmptyArray(values->type(), ctx->memory_pool()); + } + } + // TODO(GH-43851): Handle logical nulls in REE and Unions arrays correctly + if (!may_have_validity_bitmap(values->type()->id())) { + return values; + } auto drop_null_filter = Datum{MakeDropNullFilter(*values)}; return Filter(values, drop_null_filter, FilterOptions::Defaults(), ctx); } Result DropNullChunkedArray(const std::shared_ptr& values, ExecContext* ctx) { - if (values->null_count() == 0) { + // The null count in a chunked array is calculated by summing the null counts + // of all chunks, so by this point it's safe to access it without triggering + // access to potentially non-CPU data. + auto null_count = values->null_count(); + if (null_count == 0) { return values; } - if (values->null_count() == values->length()) { + if (null_count == values->length()) { return ChunkedArray::MakeEmpty(values->type(), ctx->memory_pool()); } std::vector> new_chunks; diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 1587de0e6b7..d318e08e1dc 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -996,7 +996,6 @@ cdef class Array(_PandasConvertible): ------- cast : Array """ - self._assert_cpu() return _pc().cast(self, target_type, safe=safe, options=options, memory_pool=memory_pool) @@ -1038,7 +1037,6 @@ cdef class Array(_PandasConvertible): sum : Scalar A scalar containing the sum value. """ - self._assert_cpu() options = _pc().ScalarAggregateOptions(**kwargs) return _pc().call_function('sum', [self], options) @@ -1051,7 +1049,6 @@ cdef class Array(_PandasConvertible): unique : Array An array of the same data type, with deduplicated elements. """ - self._assert_cpu() return _pc().call_function('unique', [self]) def dictionary_encode(self, null_encoding='mask'): @@ -1070,7 +1067,6 @@ cdef class Array(_PandasConvertible): encoded : DictionaryArray A dictionary-encoded version of this array. """ - self._assert_cpu() options = _pc().DictionaryEncodeOptions(null_encoding) return _pc().call_function('dictionary_encode', [self], options) @@ -1083,7 +1079,6 @@ cdef class Array(_PandasConvertible): StructArray An array of structs """ - self._assert_cpu() return _pc().call_function('value_counts', [self]) @staticmethod @@ -1358,7 +1353,6 @@ cdef class Array(_PandasConvertible): ------- array : boolean Array """ - self._assert_cpu() options = _pc().NullOptions(nan_is_null=nan_is_null) return _pc().call_function('is_null', [self], options) @@ -1370,14 +1364,12 @@ cdef class Array(_PandasConvertible): ------- array : boolean Array """ - self._assert_cpu() return _pc().call_function('is_nan', [self]) def is_valid(self): """ Return BooleanArray indicating the non-null values. """ - self._assert_cpu() return _pc().is_valid(self) def fill_null(self, fill_value): @@ -1394,7 +1386,6 @@ cdef class Array(_PandasConvertible): result : Array A new array with nulls replaced by the given value. """ - self._assert_cpu() return _pc().fill_null(self, fill_value) def __getitem__(self, key): @@ -1468,14 +1459,12 @@ cdef class Array(_PandasConvertible): taken : Array An array with the same datatype, containing the taken values. """ - self._assert_cpu() return _pc().take(self, indices) def drop_null(self): """ Remove missing values from an array. """ - self._assert_cpu() return _pc().drop_null(self) def filter(self, object mask, *, null_selection_behavior='drop'): @@ -1497,7 +1486,6 @@ cdef class Array(_PandasConvertible): An array of the same type, with only the elements selected by the boolean mask. """ - self._assert_cpu() return _pc().filter(self, mask, null_selection_behavior=null_selection_behavior) @@ -1523,7 +1511,6 @@ cdef class Array(_PandasConvertible): index : Int64Scalar The index of the value in the array (-1 if not found). """ - self._assert_cpu() return _pc().index(self, value, start, end, memory_pool=memory_pool) def sort(self, order="ascending", **kwargs): @@ -1543,7 +1530,6 @@ cdef class Array(_PandasConvertible): ------- result : Array """ - self._assert_cpu() indices = _pc().sort_indices( self, options=_pc().SortOptions(sort_keys=[("", order)], **kwargs) @@ -1555,8 +1541,6 @@ cdef class Array(_PandasConvertible): return _array_like_to_pandas(self, options, types_mapper=types_mapper) def __array__(self, dtype=None, copy=None): - self._assert_cpu() - if copy is False: try: values = self.to_numpy(zero_copy_only=True) @@ -1646,7 +1630,6 @@ cdef class Array(_PandasConvertible): ------- lst : list """ - self._assert_cpu() return [x.as_py() for x in self] def tolist(self): diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index c44ec3f8e1a..3d203c62de0 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -22,6 +22,7 @@ import hypothesis.strategies as st import itertools import pytest +import re import struct import subprocess import sys @@ -4052,13 +4053,22 @@ def test_non_cpu_array(): ctx = cuda.Context(0) data = np.arange(4, dtype=np.int32) - validity = np.array([True, False, True, False], dtype=np.bool_) + validity_data = np.array([True, False, True, False], dtype=np.bool_) + bool_data = np.array([True, False, True, False], dtype=np.bool_) + cuda_data_buf = ctx.buffer_from_data(data) - cuda_validity_buf = ctx.buffer_from_data(validity) + cuda_validity_buf = ctx.buffer_from_data(validity_data) + cuda_bool_buf = ctx.buffer_from_data(bool_data) + arr = pa.Array.from_buffers(pa.int32(), 4, [None, cuda_data_buf]) arr2 = pa.Array.from_buffers(pa.int32(), 4, [None, cuda_data_buf]) + arr_with_nulls = pa.Array.from_buffers( pa.int32(), 4, [cuda_validity_buf, cuda_data_buf]) + arr_bool = pa.Array.from_buffers(pa.bool_(), 4, [cuda_validity_buf, cuda_bool_buf]) + # REE array using arr as run-ends and arr_bool as values + arr_ree = pa.Array.from_buffers(pa.run_end_encoded(pa.int32(), pa.bool_()), 6, + [None], 0, 0, [arr, arr_bool]) # Supported arr.validate() @@ -4077,20 +4087,35 @@ def test_non_cpu_array(): with pytest.raises(NotImplementedError): arr.__dlpack_device__() + def bad_device_msg(func_name, arg_index=0): + if arg_index == 0: + ordinal = "1st" + elif arg_index == 1: + ordinal = "2nd" + elif arg_index == 2: + ordinal = "3rd" + else: + ordinal = (arg_index + 1) + "th" + pattern = f"'(array_)?{func_name}(_.+)?'" + pattern += re.escape(f" expects {ordinal} argument's device allocation " + f"types(s) to be in {'{CPU}'} but got {'{CUDA}'}.") + return pattern + # Not Supported with pytest.raises(NotImplementedError): arr.diff(arr2) - with pytest.raises(NotImplementedError): + with pytest.raises(NotImplementedError, match=bad_device_msg("cast")): arr.cast(pa.int64()) with pytest.raises(NotImplementedError): arr.view(pa.int64()) - with pytest.raises(NotImplementedError): + with pytest.raises(NotImplementedError, match=bad_device_msg("sum")): arr.sum() - with pytest.raises(NotImplementedError): + with pytest.raises(NotImplementedError, match=bad_device_msg("unique")): arr.unique() - with pytest.raises(NotImplementedError): + with pytest.raises(NotImplementedError, + match=bad_device_msg("dictionary_encode")): arr.dictionary_encode() - with pytest.raises(NotImplementedError): + with pytest.raises(NotImplementedError, match=bad_device_msg("value_counts")): arr.value_counts() with pytest.raises(NotImplementedError): arr_with_nulls.null_count @@ -4102,25 +4127,36 @@ def test_non_cpu_array(): [i for i in iter(arr)] with pytest.raises(NotImplementedError): arr == arr2 - with pytest.raises(NotImplementedError): + with pytest.raises(NotImplementedError, match=bad_device_msg("is_null", 0)): arr.is_null() - with pytest.raises(NotImplementedError): + with pytest.raises(NotImplementedError, match=bad_device_msg("is_nan", 0)): arr.is_nan() - with pytest.raises(NotImplementedError): + with pytest.raises(NotImplementedError, match=bad_device_msg("is_valid", 0)): arr.is_valid() - with pytest.raises(NotImplementedError): + with pytest.raises(NotImplementedError, match=bad_device_msg("coalesce", 0)): arr.fill_null(0) with pytest.raises(NotImplementedError): arr[0] with pytest.raises(NotImplementedError): + arr[1:2] + with pytest.raises(NotImplementedError, match=bad_device_msg("take", 0)): arr.take([0]) - with pytest.raises(NotImplementedError): - arr.drop_null() - with pytest.raises(NotImplementedError): + with pytest.raises(NotImplementedError, match=bad_device_msg("take", 1)): + pa.array([0, 1]).take(arr) + with pytest.raises(NotImplementedError, match=bad_device_msg("filter", 0)): + arr_with_nulls.drop_null() + # TODO(GH-11399): drop_null currently does nothing on union and REE arrays. + # This invocation here guarantees that once that issue is fixed, this test + # will crash unless the code is careful to not access CUDA memory from CPU. + arr_ree.drop_null() + with pytest.raises(NotImplementedError, match=bad_device_msg("filter", 0)): arr.filter([True, True, False, False]) - with pytest.raises(NotImplementedError): + with pytest.raises(NotImplementedError, match=bad_device_msg("filter", 1)): + pa.array([0, 1]).filter(arr_bool) + with pytest.raises(NotImplementedError, match="index"): arr.index(0) - with pytest.raises(NotImplementedError): + with pytest.raises(NotImplementedError, + match=bad_device_msg("sort_indices", 0)): arr.sort() with pytest.raises(NotImplementedError): arr.__array__()