Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 99 additions & 6 deletions cpp/src/arrow/compute/kernels/aggregate_basic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,28 @@ struct SumState {
}
};

template <>
struct SumState<BooleanType> {
using SumType = typename FindAccumulatorType<BooleanType>::Type;
using ThisType = SumState<BooleanType, SumType>;

ThisType& operator+=(const ThisType& rhs) {
this->count += rhs.count;
this->sum += rhs.sum;
return *this;
}

public:
void Consume(const Array& input) {
const BooleanArray& array = static_cast<const BooleanArray&>(input);
count += array.length() - array.null_count();
sum += array.true_count();
}

size_t count = 0;
typename SumType::c_type sum = 0;
};

template <typename ArrowType>
struct SumImpl : public ScalarAggregator {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
Expand Down Expand Up @@ -311,6 +333,11 @@ struct SumLikeInit {
return Status::NotImplemented("No sum implemented");
}

Status Visit(const BooleanType&) {
state.reset(new KernelClass<BooleanType>());
return Status::OK();
}

template <typename Type>
enable_if_number<Type, Status> Visit(const Type&) {
state.reset(new KernelClass<Type>());
Expand Down Expand Up @@ -339,13 +366,38 @@ std::unique_ptr<KernelState> MeanInit(KernelContext* ctx, const KernelInitArgs&
template <typename ArrowType, typename Enable = void>
struct MinMaxState {};

template <typename ArrowType>
struct MinMaxState<ArrowType, enable_if_boolean<ArrowType>> {
using ThisType = MinMaxState<ArrowType>;
using T = typename ArrowType::c_type;

ThisType& operator+=(const ThisType& rhs) {
this->has_nulls |= rhs.has_nulls;
this->has_values |= rhs.has_values;
this->min = this->min && rhs.min;
this->max = this->max || rhs.max;
return *this;
}

void MergeOne(T value) {
this->min = this->min && value;
this->max = this->max || value;
}

T min = true;
T max = false;
bool has_nulls = false;
bool has_values = false;
};

template <typename ArrowType>
struct MinMaxState<ArrowType, enable_if_integer<ArrowType>> {
using ThisType = MinMaxState<ArrowType>;
using T = typename ArrowType::c_type;

ThisType& operator+=(const ThisType& rhs) {
this->has_nulls |= rhs.has_nulls;
this->has_values |= rhs.has_values;
this->min = std::min(this->min, rhs.min);
this->max = std::max(this->max, rhs.max);
return *this;
Expand All @@ -359,6 +411,7 @@ struct MinMaxState<ArrowType, enable_if_integer<ArrowType>> {
T min = std::numeric_limits<T>::max();
T max = std::numeric_limits<T>::min();
bool has_nulls = false;
bool has_values = false;
};

template <typename ArrowType>
Expand All @@ -368,6 +421,7 @@ struct MinMaxState<ArrowType, enable_if_floating_point<ArrowType>> {

ThisType& operator+=(const ThisType& rhs) {
this->has_nulls |= rhs.has_nulls;
this->has_values |= rhs.has_values;
this->min = std::fmin(this->min, rhs.min);
this->max = std::fmax(this->max, rhs.max);
return *this;
Expand All @@ -381,6 +435,7 @@ struct MinMaxState<ArrowType, enable_if_floating_point<ArrowType>> {
T min = std::numeric_limits<T>::infinity();
T max = -std::numeric_limits<T>::infinity();
bool has_nulls = false;
bool has_values = false;
};

template <typename ArrowType>
Expand All @@ -397,24 +452,26 @@ struct MinMaxImpl : public ScalarAggregator {

ArrayType arr(batch[0].array());

local.has_nulls = arr.null_count() > 0;
const auto null_count = arr.null_count();
local.has_nulls = null_count > 0;
local.has_values = (arr.length() - null_count) > 0;

if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) {
this->state = local;
return;
}

const auto values = arr.raw_values();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BooleanArray has no raw_values() method.

if (arr.null_count() > 0) {
if (local.has_nulls) {
BitmapReader reader(arr.null_bitmap_data(), arr.offset(), arr.length());
for (int64_t i = 0; i < arr.length(); i++) {
if (reader.IsSet()) {
local.MergeOne(values[i]);
local.MergeOne(arr.Value(i));
}
reader.Next();
}
} else {
for (int64_t i = 0; i < arr.length(); i++) {
local.MergeOne(values[i]);
local.MergeOne(arr.Value(i));
}
}
this->state = local;
Expand All @@ -429,7 +486,8 @@ struct MinMaxImpl : public ScalarAggregator {
using ScalarType = typename TypeTraits<ArrowType>::ScalarType;

std::vector<std::shared_ptr<Scalar>> values;
if (state.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) {
if (!state.has_values ||
(state.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL)) {
// (null, null)
values = {std::make_shared<ScalarType>(), std::make_shared<ScalarType>()};
} else {
Expand All @@ -444,6 +502,33 @@ struct MinMaxImpl : public ScalarAggregator {
MinMaxState<ArrowType> state;
};

struct BooleanMinMaxImpl : public MinMaxImpl<BooleanType> {
using MinMaxImpl::MinMaxImpl;

void Consume(KernelContext*, const ExecBatch& batch) override {
StateType local;
ArrayType arr(batch[0].array());

const auto arr_length = arr.length();
const auto null_count = arr.null_count();
const auto valid_count = arr_length - null_count;

local.has_nulls = null_count > 0;
local.has_values = valid_count > 0;
if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) {
this->state = local;
return;
}

const auto true_count = arr.true_count();
const auto false_count = valid_count - true_count;
local.max = true_count > 0;
local.min = false_count == 0;

this->state = local;
}
};

struct MinMaxInitState {
std::unique_ptr<KernelState> state;
KernelContext* ctx;
Expand All @@ -463,6 +548,11 @@ struct MinMaxInitState {
return Status::NotImplemented("No sum implemented");
}

Status Visit(const BooleanType&) {
state.reset(new BooleanMinMaxImpl(out_type, options));
return Status::OK();
}

template <typename Type>
enable_if_number<Type, Status> Visit(const Type&) {
state.reset(new MinMaxImpl<Type>(out_type, options));
Expand Down Expand Up @@ -525,18 +615,21 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
DCHECK_OK(registry->AddFunction(std::move(func)));

func = std::make_shared<ScalarAggregateFunction>("sum", Arity::Unary());
AddBasicAggKernels(SumInit, {boolean()}, int64(), func.get());
AddBasicAggKernels(SumInit, SignedIntTypes(), int64(), func.get());
AddBasicAggKernels(SumInit, UnsignedIntTypes(), uint64(), func.get());
AddBasicAggKernels(SumInit, FloatingPointTypes(), float64(), func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));

func = std::make_shared<ScalarAggregateFunction>("mean", Arity::Unary());
AddBasicAggKernels(MeanInit, {boolean()}, float64(), func.get());
AddBasicAggKernels(MeanInit, NumericTypes(), float64(), func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));

static auto default_minmax_options = MinMaxOptions::Defaults();
func = std::make_shared<ScalarAggregateFunction>("minmax", Arity::Unary(),
&default_minmax_options);
AddMinMaxKernels(MinMaxInit, {boolean()}, func.get());
AddMinMaxKernels(MinMaxInit, NumericTypes(), func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
}
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/compute/kernels/aggregate_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ namespace compute {
template <typename I, typename Enable = void>
struct FindAccumulatorType {};

template <typename I>
struct FindAccumulatorType<I, enable_if_boolean<I>> {
using Type = UInt64Type;
};

template <typename I>
struct FindAccumulatorType<I, enable_if_signed_integer<I>> {
using Type = Int64Type;
Expand Down
87 changes: 82 additions & 5 deletions cpp/src/arrow/compute/kernels/aggregate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,41 @@ void ValidateSum(const Array& array) {
ValidateSum<ArrowType>(array, NaiveSum<ArrowType>(array));
}

using UnaryOp = Result<Datum>(const Datum&, ExecContext*);

template <UnaryOp& Op, typename ScalarType>
void ValidateBooleanAgg(const std::string& json,
const std::shared_ptr<ScalarType>& expected) {
auto array = ArrayFromJSON(boolean(), json);
auto exp = Datum(expected);
ASSERT_OK_AND_ASSIGN(Datum result, Op(array, nullptr));
ASSERT_TRUE(result.Equals(exp));
}

TEST(TestBooleanAggregation, Sum) {
ValidateBooleanAgg<Sum>("[]", std::make_shared<UInt64Scalar>());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't it 0 here?

ValidateBooleanAgg<Sum>("[null]", std::make_shared<UInt64Scalar>());
ValidateBooleanAgg<Sum>("[null, false]", std::make_shared<UInt64Scalar>(0));
ValidateBooleanAgg<Sum>("[true]", std::make_shared<UInt64Scalar>(1));
ValidateBooleanAgg<Sum>("[true, false, true]", std::make_shared<UInt64Scalar>(2));
ValidateBooleanAgg<Sum>("[true, false, true, true, null]",
std::make_shared<UInt64Scalar>(3));
}

TEST(TestBooleanAggregation, Mean) {
ValidateBooleanAgg<Mean>("[]", std::make_shared<DoubleScalar>());
ValidateBooleanAgg<Mean>("[null]", std::make_shared<DoubleScalar>());
ValidateBooleanAgg<Mean>("[null, false]", std::make_shared<DoubleScalar>(0));
ValidateBooleanAgg<Mean>("[true]", std::make_shared<DoubleScalar>(1));
ValidateBooleanAgg<Mean>("[true, false, true, false]",
std::make_shared<DoubleScalar>(0.5));
ValidateBooleanAgg<Mean>("[true, null]", std::make_shared<DoubleScalar>(1));
ValidateBooleanAgg<Mean>("[true, null, false, true, true]",
std::make_shared<DoubleScalar>(0.75));
ValidateBooleanAgg<Mean>("[true, null, false, false, false]",
std::make_shared<DoubleScalar>(0.25));
}

template <typename ArrowType>
class TestNumericSumKernel : public ::testing::Test {};

Expand Down Expand Up @@ -346,10 +381,10 @@ TYPED_TEST(TestRandomNumericMeanKernel, RandomArrayMean) {
///

template <typename ArrowType>
class TestNumericMinMaxKernel : public ::testing::Test {
class TestPrimitiveMinMaxKernel : public ::testing::Test {
using Traits = TypeTraits<ArrowType>;
using ArrayType = typename Traits::ArrayType;
using c_type = typename ArrayType::value_type;
using c_type = typename ArrowType::c_type;
using ScalarType = typename Traits::ScalarType;

public:
Expand Down Expand Up @@ -401,15 +436,57 @@ class TestNumericMinMaxKernel : public ::testing::Test {
};

template <typename ArrowType>
class TestFloatingMinMaxKernel : public TestNumericMinMaxKernel<ArrowType> {};
class TestIntegerMinMaxKernel : public TestPrimitiveMinMaxKernel<ArrowType> {};

template <typename ArrowType>
class TestFloatingMinMaxKernel : public TestPrimitiveMinMaxKernel<ArrowType> {};

class TestBooleanMinMaxKernel : public TestPrimitiveMinMaxKernel<BooleanType> {};

TEST_F(TestBooleanMinMaxKernel, Basics) {
MinMaxOptions options;
std::vector<std::string> chunked_input0 = {"[]", "[]"};
std::vector<std::string> chunked_input1 = {"[true, true, null]", "[true, null]"};
std::vector<std::string> chunked_input2 = {"[false, false, false]", "[false]"};
std::vector<std::string> chunked_input3 = {"[true, null]", "[null, false]"};

// SKIP nulls by default
this->AssertMinMaxIsNull("[]", options);
this->AssertMinMaxIsNull("[null, null, null]", options);
this->AssertMinMaxIs("[false, false, false]", false, false, options);
this->AssertMinMaxIs("[false, false, false, null]", false, false, options);
this->AssertMinMaxIs("[true, null, true, true]", true, true, options);
this->AssertMinMaxIs("[true, null, true, true]", true, true, options);
this->AssertMinMaxIs("[true, null, false, true]", false, true, options);
this->AssertMinMaxIsNull(chunked_input0, options);
this->AssertMinMaxIs(chunked_input1, true, true, options);
this->AssertMinMaxIs(chunked_input2, false, false, options);
this->AssertMinMaxIs(chunked_input3, false, true, options);

options = MinMaxOptions(MinMaxOptions::OUTPUT_NULL);
this->AssertMinMaxIsNull("[]", options);
this->AssertMinMaxIsNull("[null, null, null]", options);
this->AssertMinMaxIsNull("[false, null, false]", options);
this->AssertMinMaxIsNull("[true, null]", options);
this->AssertMinMaxIs("[true, true, true]", true, true, options);
this->AssertMinMaxIs("[false, false]", false, false, options);
this->AssertMinMaxIs("[false, true]", false, true, options);
this->AssertMinMaxIsNull(chunked_input0, options);
this->AssertMinMaxIsNull(chunked_input1, options);
this->AssertMinMaxIs(chunked_input2, false, false, options);
this->AssertMinMaxIsNull(chunked_input3, options);
}

TYPED_TEST_SUITE(TestNumericMinMaxKernel, IntegralArrowTypes);
TYPED_TEST(TestNumericMinMaxKernel, Basics) {
TYPED_TEST_SUITE(TestIntegerMinMaxKernel, IntegralArrowTypes);
TYPED_TEST(TestIntegerMinMaxKernel, Basics) {
MinMaxOptions options;
std::vector<std::string> chunked_input1 = {"[5, 1, 2, 3, 4]", "[9, 1, null, 3, 4]"};
std::vector<std::string> chunked_input2 = {"[5, null, 2, 3, 4]", "[9, 1, 2, 3, 4]"};
std::vector<std::string> chunked_input3 = {"[5, 1, 2, 3, null]", "[9, 1, null, 3, 4]"};

// SKIP nulls by default
this->AssertMinMaxIsNull("[]", options);
this->AssertMinMaxIsNull("[null, null, null]", options);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So these were returning the types' extreme values.

this->AssertMinMaxIs("[5, 1, 2, 3, 4]", 1, 5, options);
this->AssertMinMaxIs("[5, null, 2, 3, 4]", 2, 5, options);
this->AssertMinMaxIs(chunked_input1, 1, 9, options);
Expand Down
7 changes: 1 addition & 6 deletions r/R/compute.R
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,10 @@ scalar_aggregate <- function(FUN, ..., na.rm = FALSE) {
if (FUN %in% c("mean", "sum")) {
# Arrow sum/mean function always drops NAs so handle that here
# https://issues.apache.org/jira/browse/ARROW-9054
return(Scalar$create(NA_integer_, type = a$type))
return(Scalar$create(NA_real_))
}
}

if (inherits(a$type, "Boolean")) {
# Bool sum/mean not implemented so cast to int
# https://issues.apache.org/jira/browse/ARROW-9055
a <- a$cast(int8())
}
Scalar$create(call_function(FUN, a, options = list(na.rm = na.rm)))
}

Expand Down
9 changes: 6 additions & 3 deletions r/tests/testthat/test-compute.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ test_that("sum.Array", {
expect_is(sum(na, na.rm = TRUE), "Scalar")
expect_identical(as.numeric(sum(na, na.rm = TRUE)), sum(floats, na.rm = TRUE))

bools <- c(TRUE, TRUE, FALSE)
bools <- c(TRUE, NA, TRUE, FALSE)
b <- Array$create(bools)
expect_identical(as.integer(sum(b)), sum(bools))
expect_identical(as.integer(sum(b, na.rm = TRUE)), sum(bools, na.rm = TRUE))
})

test_that("sum.ChunkedArray", {
Expand Down Expand Up @@ -73,9 +74,10 @@ test_that("mean.Array", {
expect_is(mean(na, na.rm = TRUE), "Scalar")
expect_identical(as.vector(mean(na, na.rm = TRUE)), mean(floats, na.rm = TRUE))

bools <- c(TRUE, TRUE, FALSE)
bools <- c(TRUE, NA, TRUE, FALSE)
b <- Array$create(bools)
expect_identical(as.vector(mean(b)), mean(bools))
expect_identical(as.integer(sum(b, na.rm = TRUE)), sum(bools, na.rm = TRUE))
})

test_that("mean.ChunkedArray", {
Expand Down Expand Up @@ -113,5 +115,6 @@ test_that("min/max.Array", {

bools <- c(TRUE, TRUE, FALSE)
b <- Array$create(bools)
expect_identical(as.vector(min(b)), min(bools))
# R is inconsistent here: typeof(min(NA)) == "integer", not "logical"
expect_identical(as.vector(min(b)), as.logical(min(bools)))
})