Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cpp/src/arrow/array/data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ static inline void AdjustNonNullable(Type::type type_id, int64_t length,

namespace internal {

void AdjustNonNullable(ArrayData* array_data) {
int64_t null_count = kUnknownNullCount;
AdjustNonNullable(array_data->type->id(), array_data->length, &array_data->buffers,
&null_count);
if (null_count != kUnknownNullCount) {
array_data->null_count.store(null_count, std::memory_order_release);
}
}

bool IsNullSparseUnion(const ArrayData& data, int64_t i) {
auto* union_type = checked_cast<const SparseUnionType*>(data.type.get());
const auto* types = reinterpret_cast<const int8_t*>(data.buffers[1]->data());
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/array/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
namespace arrow {

namespace internal {

ARROW_EXPORT void AdjustNonNullable(ArrayData* array_data);

// ----------------------------------------------------------------------
// Null handling for types without a validity bitmap and the dictionary type

Expand Down Expand Up @@ -342,6 +345,17 @@ struct ARROW_EXPORT ArrayData {
/// ...
/// }
bool MayHaveLogicalNulls() const {
if (ARROW_PREDICT_FALSE(device_type() != DeviceAllocationType::kCPU)) {
// Some arrays are malformed in that they have kUnknownNullCount, but no
// validity bitmap and that makes MayHaveLogicalNulls() return true when
// it should return false.
//
// Not a problem if we're on the CPU because ArrayData::GetNullCount()
// will eventually return 0. But it's a problem if we're trying to make
// cheap decisions without fully calculating the null count and want to
// defer the processing of the bitmap to later.
internal::AdjustNonNullable(const_cast<ArrayData*>(this));
}
if (buffers[0] != NULLPTR) {
return null_count.load() != 0;
}
Expand Down
36 changes: 35 additions & 1 deletion cpp/src/arrow/compute/function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,38 @@ Status Function::CheckArity(size_t num_args) const {
namespace {

Status CheckOptions(const Function& function, const FunctionOptions* options) {
if (options == nullptr && function.doc().options_required) {
if (ARROW_PREDICT_FALSE(options == nullptr && function.doc().options_required)) {
return Status::Invalid("Function '", function.name(),
"' cannot be called without options");
}
return Status::OK();
}

Status BadDeviceTypeStatus(const std::string& func_name,
DeviceAllocationTypeSet expected_device_type_set,
int offending_arg_index, const Datum& offending_arg) {
std::string ordinal;
switch (offending_arg_index) {
case 0:
ordinal = "1st";
break;
case 1:
ordinal = "2nd";
break;
case 2:
ordinal = "3rd";
break;
default:
ordinal = std::to_string(offending_arg_index + 1);
ordinal += "th";
break;
}
return Status::NotImplemented("'", func_name, "' expects ", ordinal,
" argument's device allocation types(s) to be in ",
expected_device_type_set.ToString(), " but got ",
offending_arg.device_types().ToString(), ".");
}

} // namespace

namespace detail {
Expand Down Expand Up @@ -248,6 +273,15 @@ struct FunctionExecutorImpl : public FunctionExecutor {
}
args_with_cast[i] = std::move(arg);
}
{
DeviceAllocationTypeSet expected_device_type_set;
int offending_arg_index = 0;
if (ARROW_PREDICT_FALSE(!kernel->signature->MatchesDeviceAllocationTypes(
args_with_cast, &expected_device_type_set, &offending_arg_index))) {
return BadDeviceTypeStatus(func_name, expected_device_type_set,
offending_arg_index, args[offending_arg_index]);
}
}

detail::DatumAccumulator listener;

Expand Down
64 changes: 64 additions & 0 deletions cpp/src/arrow/compute/kernel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <string>

#include "arrow/buffer.h"
#include "arrow/chunked_array.h"
#include "arrow/compute/exec.h"
#include "arrow/device_allocation_type_set.h"
#include "arrow/result.h"
Expand Down Expand Up @@ -431,6 +432,25 @@ bool InputType::Matches(const Datum& value) const {
return Matches(*value.type());
}

bool InputType::MatchesDeviceAllocationType(const Datum& value) const {
DCHECK(Matches(value));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will probably remove this DCHECK and keep the ones in KernelSignature::MatchesDeviceAllocationTypes.

switch (value.kind()) {
case Datum::NONE:
case Datum::RECORD_BATCH:
case Datum::TABLE:
Comment on lines +439 to +440
Copy link
Contributor Author

@felipecrv felipecrv Aug 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can probably handle these even though they are not passed here.

break;
case Datum::ARRAY:
return accepted_device_types_.contains(value.array()->device_type());
case Datum::CHUNKED_ARRAY:
return accepted_device_types_.Contains(value.chunked_array()->device_types());
case Datum::SCALAR:
// Scalars are asssumed as always residing in CPU memory for now.
return accepted_device_types_.contains(DeviceAllocationType::kCPU);
}
DCHECK(false) << "MatchesDeviceAllocationType expects ARRAY, CHUNKED_ARRAY or SCALAR";
return false;
}

const std::shared_ptr<DataType>& InputType::type() const {
DCHECK_EQ(InputType::EXACT_TYPE, kind_);
return type_;
Expand Down Expand Up @@ -529,6 +549,50 @@ bool KernelSignature::MatchesInputs(const std::vector<TypeHolder>& types) const
return true;
}

bool KernelSignature::MatchesDeviceAllocationTypes(
const std::vector<Datum>& args, DeviceAllocationTypeSet* out_expected_device_types,
int* out_offending_arg_index) const {
DeviceAllocationTypeSet expected_device_types;
int offending_arg_index = 0;
bool matches = true;
if (is_varargs_) {
for (size_t i = 0; i < args.size(); ++i) {
auto& param_type = in_types_[std::min(i, in_types_.size() - 1)];
DCHECK(param_type.Matches(*args[i].type()));
if (!param_type.MatchesDeviceAllocationType(args[i])) {
matches = false;
expected_device_types = param_type.accepted_device_types();
offending_arg_index = static_cast<int>(i);
break;
}
}
} else {
DCHECK(args.size() == in_types_.size());
if (args.size() != in_types_.size()) {
matches = false;
} else {
for (size_t i = 0; i < in_types_.size(); ++i) {
auto& param_type = in_types_[i];
DCHECK(param_type.Matches(*args[i].type()));
if (!param_type.MatchesDeviceAllocationType(args[i])) {
matches = false;
offending_arg_index = static_cast<int>(i);
expected_device_types = param_type.accepted_device_types();
break;
}
}
}
}

if (out_expected_device_types) {
*out_expected_device_types = expected_device_types;
}
if (out_offending_arg_index) {
*out_offending_arg_index = offending_arg_index;
}
return matches;
}

size_t KernelSignature::Hash() const {
if (hash_code_ != 0) {
return hash_code_;
Expand Down
62 changes: 53 additions & 9 deletions cpp/src/arrow/compute/kernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <vector>

#include "arrow/buffer.h"
#include "arrow/chunked_array.h" // for DeviceAllocationTypeSet
#include "arrow/compute/exec.h"
#include "arrow/datum.h"
#include "arrow/device_allocation_type_set.h"
Expand Down Expand Up @@ -194,16 +195,37 @@ class ARROW_EXPORT InputType {
USE_TYPE_MATCHER
};

/// \brief Accept any value type
InputType() : kind_(ANY_TYPE) {}

/// \brief Accept an exact value type.
/// \brief Accept any value type of values allocated on the given set of device
/// types.
explicit InputType(DeviceAllocationTypeSet accepted_devices)
: kind_(ANY_TYPE), accepted_device_types_(std::move(accepted_devices)) {}

/// \brief Accept an exact value type of values allocated on the given set of device
/// types.
InputType(std::shared_ptr<DataType> type, DeviceAllocationTypeSet accepted_devices)
: kind_(EXACT_TYPE),
type_(std::move(type)),
accepted_device_types_(accepted_devices) {}

/// \brief Use the passed TypeMatcher to check the type of arguments allocated on the
/// given set of device types.
InputType(std::shared_ptr<TypeMatcher> type_matcher,
DeviceAllocationTypeSet accepted_devices)
: kind_(USE_TYPE_MATCHER),
type_matcher_(std::move(type_matcher)),
accepted_device_types_(std::move(accepted_devices)) {}

/// \brief Accept any value type of values allocated on the CPU.
InputType() : InputType(DeviceAllocationTypeSet::CpuOnly()) {}

/// \brief Accept an exact value type of values allocated on the CPU.
InputType(std::shared_ptr<DataType> type) // NOLINT implicit construction
: kind_(EXACT_TYPE), type_(std::move(type)) {}
: InputType(std::move(type), DeviceAllocationTypeSet::CpuOnly()) {}

/// \brief Use the passed TypeMatcher to type check.
/// \brief Use the passed TypeMatcher to type check the type of arguments allocated on
/// the CPU.
InputType(std::shared_ptr<TypeMatcher> type_matcher) // NOLINT implicit construction
: kind_(USE_TYPE_MATCHER), type_matcher_(std::move(type_matcher)) {}
: InputType(std::move(type_matcher), DeviceAllocationTypeSet::CpuOnly()) {}

/// \brief Match any type with the given Type::type. Uses a TypeMatcher for
/// its implementation.
Expand Down Expand Up @@ -242,6 +264,13 @@ class ARROW_EXPORT InputType {
/// \brief Return true if the type matches this InputType
bool Matches(const DataType& type) const;

/// \brief Return true if the Datum's device allocation type matches this
/// argument's accepted device allocation type set (and only allows scalar or
/// array-like Datums).
///
/// \pre Matches(value) == true
bool MatchesDeviceAllocationType(const Datum& value) const;

/// \brief The type matching rule that this InputType uses.
Kind kind() const { return kind_; }

Expand All @@ -255,26 +284,32 @@ class ARROW_EXPORT InputType {
/// and will assert in debug builds.
const TypeMatcher& type_matcher() const;

/// \brief The device allocation types that are accepted for this input type.
DeviceAllocationTypeSet accepted_device_types() const { return accepted_device_types_; }

private:
void CopyInto(const InputType& other) {
this->kind_ = other.kind_;
this->type_ = other.type_;
this->type_matcher_ = other.type_matcher_;
this->accepted_device_types_ = other.accepted_device_types_;
}

void MoveInto(InputType&& other) {
this->kind_ = other.kind_;
this->type_ = std::move(other.type_);
this->type_matcher_ = std::move(other.type_matcher_);
this->accepted_device_types_ = std::move(other.accepted_device_types_);
}

Kind kind_;

// For EXACT_TYPE Kind
std::shared_ptr<DataType> type_;

// For USE_TYPE_MATCHER Kind
std::shared_ptr<TypeMatcher> type_matcher_;

// For matching device types. CPU-only by default.
DeviceAllocationTypeSet accepted_device_types_;
};

/// \brief Container to capture both exact and input-dependent output types.
Expand Down Expand Up @@ -368,6 +403,15 @@ class ARROW_EXPORT KernelSignature {
/// value descriptors.
bool MatchesInputs(const std::vector<TypeHolder>& types) const;

/// \brief Return true if the signature is compatible with the list of
/// execution arguments regarding device allocation types.
///
/// \pre MatchesInputs(GetTypes(args)) == true
bool MatchesDeviceAllocationTypes(
const std::vector<Datum>& args,
DeviceAllocationTypeSet* out_expected_device_types = NULLPTR,
int* offending_arg_index = NULLPTR) const;

/// \brief Returns true if the input types of each signature are
/// equal. Well-formed functions should have a deterministic output type
/// given input types, but currently it is the responsibility of the
Expand Down
33 changes: 26 additions & 7 deletions cpp/src/arrow/compute/kernels/vector_selection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ using internal::BitBlockCounter;
using internal::CheckIndexBounds;
using internal::CopyBitmap;
using internal::CountSetBits;
using internal::may_have_validity_bitmap;
using internal::OptionalBitBlockCounter;
using internal::OptionalBitIndexer;

Expand All @@ -68,32 +69,50 @@ using TakeState = OptionsWrapper<TakeOptions>;
// ----------------------------------------------------------------------
// DropNull Implementation

/// \pre values.MayHaveNulls()
/// \pre may_have_validity_bitmap(values_type_id)
std::shared_ptr<arrow::BooleanArray> MakeDropNullFilter(const Array& values) {
DCHECK(may_have_validity_bitmap(values.type()->id()));
auto& bitmap_buffer = values.null_bitmap();
DCHECK(bitmap_buffer);
return std::make_shared<BooleanArray>(values.length(), bitmap_buffer, nullptr, 0,
values.offset());
}

Result<Datum> DropNullArray(const std::shared_ptr<Array>& values, ExecContext* ctx) {
if (values->null_count() == 0) {
if (!values->data()->MayHaveLogicalNulls()) {
return values;
}
if (values->null_count() == values->length()) {
return MakeEmptyArray(values->type(), ctx->memory_pool());
}
if (values->type()->id() == Type::type::NA) {
if (ARROW_PREDICT_FALSE(values->type()->id() == Type::type::NA)) {
return std::make_shared<NullArray>(0);
}
if (values->device_type() == DeviceAllocationType::kCPU) {
const auto null_count = values->null_count();
if (null_count == 0) {
return values;
}
if (null_count == values->length()) {
return MakeEmptyArray(values->type(), ctx->memory_pool());
}
}
// TODO(GH-43851): Handle logical nulls in REE and Unions arrays correctly
if (!may_have_validity_bitmap(values->type()->id())) {
return values;
}
auto drop_null_filter = Datum{MakeDropNullFilter(*values)};
return Filter(values, drop_null_filter, FilterOptions::Defaults(), ctx);
}

Result<Datum> DropNullChunkedArray(const std::shared_ptr<ChunkedArray>& values,
ExecContext* ctx) {
if (values->null_count() == 0) {
// The null count in a chunked array is calculated by summing the null counts
// of all chunks, so by this point it's safe to access it without triggering
// access to potentially non-CPU data.
auto null_count = values->null_count();
if (null_count == 0) {
return values;
}
if (values->null_count() == values->length()) {
if (null_count == values->length()) {
return ChunkedArray::MakeEmpty(values->type(), ctx->memory_pool());
}
std::vector<std::shared_ptr<Array>> new_chunks;
Expand Down
Loading