Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ if(ARROW_COMPUTE)
compute/kernels/boolean.cc
compute/kernels/cast.cc
compute/kernels/hash.cc
compute/kernels/mean.cc
compute/kernels/sum.cc
compute/kernels/util-internal.cc)
endif()
Expand Down
191 changes: 123 additions & 68 deletions cpp/src/arrow/compute/kernels/aggregate-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
// specific language governing permissions and limitations
// under the License.

#include <algorithm>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>

#include <gtest/gtest.h>

#include "arrow/array.h"
#include "arrow/compute/kernel.h"
#include "arrow/compute/kernels/mean.h"
#include "arrow/compute/kernels/sum-internal.h"
#include "arrow/compute/kernels/sum.h"
#include "arrow/compute/test-util.h"
#include "arrow/type.h"
Expand All @@ -38,94 +43,69 @@ using std::vector;
namespace arrow {
namespace compute {

template <typename Type, typename Enable = void>
struct DatumEqual {
static void EnsureEqual(const Datum& lhs, const Datum& rhs) {}
};

template <typename Type>
struct DatumEqual<Type, typename std::enable_if<IsFloatingPoint<Type>::Value>::type> {
static constexpr double kArbitraryDoubleErrorBound = 1.0;
using ScalarType = typename TypeTraits<Type>::ScalarType;

static void EnsureEqual(const Datum& lhs, const Datum& rhs) {
ASSERT_EQ(lhs.kind(), rhs.kind());
if (lhs.kind() == Datum::SCALAR) {
auto left = static_cast<const ScalarType*>(lhs.scalar().get());
auto right = static_cast<const ScalarType*>(rhs.scalar().get());
ASSERT_EQ(left->type->id(), right->type->id());
ASSERT_NEAR(left->value, right->value, kArbitraryDoubleErrorBound);
}
}
};

template <typename Type>
struct DatumEqual<Type, typename std::enable_if<!IsFloatingPoint<Type>::value>::type> {
using ScalarType = typename TypeTraits<Type>::ScalarType;
static void EnsureEqual(const Datum& lhs, const Datum& rhs) {
ASSERT_EQ(lhs.kind(), rhs.kind());
if (lhs.kind() == Datum::SCALAR) {
auto left = static_cast<const ScalarType*>(lhs.scalar().get());
auto right = static_cast<const ScalarType*>(rhs.scalar().get());
ASSERT_EQ(left->type->id(), right->type->id());
ASSERT_EQ(left->value, right->value);
}
}
};

template <typename ArrowType>
void ValidateSum(FunctionContext* ctx, const Array& input, Datum expected) {
using OutputType = typename FindAccumulatorType<ArrowType>::Type;
Datum result;
ASSERT_OK(Sum(ctx, input, &result));
DatumEqual<OutputType>::EnsureEqual(result, expected);
}
using SumResult =
std::pair<typename FindAccumulatorType<ArrowType>::Type::c_type, size_t>;

template <typename ArrowType>
void ValidateSum(FunctionContext* ctx, const char* json, Datum expected) {
auto array = ArrayFromJSON(TypeTraits<ArrowType>::type_singleton(), json);
ValidateSum<ArrowType>(ctx, *array, expected);
}

template <typename ArrowType>
static Datum DummySum(const Array& array) {
static SumResult<ArrowType> NaiveSumPartial(const Array& array) {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
using SumType = typename FindAccumulatorType<ArrowType>::Type;
using SumScalarType = typename TypeTraits<SumType>::ScalarType;
using ResultType = SumResult<ArrowType>;

typename SumType::c_type sum = 0;
int64_t count = 0;
ResultType result;

auto data = array.data();
internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), array.length());
const auto& array_numeric = reinterpret_cast<const ArrayType&>(array);
const auto values = array_numeric.raw_values();
for (int64_t i = 0; i < array.length(); i++) {
if (reader.IsSet()) {
sum += values[i];
count++;
result.first += values[i];
result.second++;
}

reader.Next();
}

if (count > 0) {
return Datum(std::make_shared<SumScalarType>(sum));
} else {
return Datum(std::make_shared<SumScalarType>(0, false));
}
return result;
}

template <typename ArrowType>
static Datum NaiveSum(const Array& array) {
using SumType = typename FindAccumulatorType<ArrowType>::Type;
using SumScalarType = typename TypeTraits<SumType>::ScalarType;

auto result = NaiveSumPartial<ArrowType>(array);
bool is_valid = result.second > 0;

return Datum(std::make_shared<SumScalarType>(result.first, is_valid));
}

template <typename ArrowType>
void ValidateSum(FunctionContext* ctx, const Array& input, Datum expected) {
using OutputType = typename FindAccumulatorType<ArrowType>::Type;

Datum result;
ASSERT_OK(Sum(ctx, input, &result));
DatumEqual<OutputType>::EnsureEqual(result, expected);
}

template <typename ArrowType>
void ValidateSum(FunctionContext* ctx, const char* json, Datum expected) {
auto array = ArrayFromJSON(TypeTraits<ArrowType>::type_singleton(), json);
ValidateSum<ArrowType>(ctx, *array, expected);
}

template <typename ArrowType>
void ValidateSum(FunctionContext* ctx, const Array& array) {
ValidateSum<ArrowType>(ctx, array, DummySum<ArrowType>(array));
ValidateSum<ArrowType>(ctx, array, NaiveSum<ArrowType>(array));
}

template <typename ArrowType>
class TestSumKernelNumeric : public ComputeFixture, public TestBase {};
class TestNumericSumKernel : public ComputeFixture, public TestBase {};

TYPED_TEST_CASE(TestSumKernelNumeric, NumericArrowTypes);
TYPED_TEST(TestSumKernelNumeric, SimpleSum) {
TYPED_TEST_CASE(TestNumericSumKernel, NumericArrowTypes);
TYPED_TEST(TestNumericSumKernel, SimpleSum) {
using SumType = typename FindAccumulatorType<TypeParam>::Type;
using ScalarType = typename TypeTraits<SumType>::ScalarType;
using T = typename TypeParam::c_type;
Expand All @@ -145,10 +125,10 @@ TYPED_TEST(TestSumKernelNumeric, SimpleSum) {
}

template <typename ArrowType>
class TestRandomSumKernelNumeric : public ComputeFixture, public TestBase {};
class TestRandomNumericSumKernel : public ComputeFixture, public TestBase {};

TYPED_TEST_CASE(TestRandomSumKernelNumeric, NumericArrowTypes);
TYPED_TEST(TestRandomSumKernelNumeric, RandomArraySum) {
TYPED_TEST_CASE(TestRandomNumericSumKernel, NumericArrowTypes);
TYPED_TEST(TestRandomNumericSumKernel, RandomArraySum) {
auto rand = random::RandomArrayGenerator(0x5487655);
for (size_t i = 3; i < 14; i++) {
for (auto null_probability : {0.0, 0.01, 0.1, 0.25, 0.5, 1.0}) {
Expand All @@ -161,7 +141,7 @@ TYPED_TEST(TestRandomSumKernelNumeric, RandomArraySum) {
}
}

TYPED_TEST(TestRandomSumKernelNumeric, RandomSliceArraySum) {
TYPED_TEST(TestRandomNumericSumKernel, RandomSliceArraySum) {
auto arithmetic = ArrayFromJSON(TypeTraits<TypeParam>::type_singleton(),
"[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]");
ValidateSum<TypeParam>(&this->ctx_, *arithmetic);
Expand All @@ -175,12 +155,87 @@ TYPED_TEST(TestRandomSumKernelNumeric, RandomSliceArraySum) {
const int64_t length = 1U << 6;
auto array = rand.Numeric<TypeParam>(length, 0, 10, 0.5);
for (size_t i = 1; i < 16; i++) {
for (size_t j = 1; i < 16; i++) {
for (size_t j = 1; j < 16; j++) {
auto slice = array->Slice(i, length - j);
ValidateSum<TypeParam>(&this->ctx_, *slice);
}
}
}

template <typename ArrowType>
static Datum NaiveMean(const Array& array) {
using MeanScalarType = typename TypeTraits<DoubleType>::ScalarType;

const auto result = NaiveSumPartial<ArrowType>(array);
const double mean = static_cast<double>(result.first) /
static_cast<double>(result.second ? result.second : 1UL);
const bool is_valid = result.second > 0;

return Datum(std::make_shared<MeanScalarType>(mean, is_valid));
}

template <typename ArrowType>
void ValidateMean(FunctionContext* ctx, const Array& input, Datum expected) {
using OutputType = typename FindAccumulatorType<DoubleType>::Type;

Datum result;
ASSERT_OK(Mean(ctx, input, &result));
DatumEqual<OutputType>::EnsureEqual(result, expected);
}

template <typename ArrowType>
void ValidateMean(FunctionContext* ctx, const char* json, Datum expected) {
auto array = ArrayFromJSON(TypeTraits<ArrowType>::type_singleton(), json);
ValidateMean<ArrowType>(ctx, *array, expected);
}

template <typename ArrowType>
void ValidateMean(FunctionContext* ctx, const Array& array) {
ValidateMean<ArrowType>(ctx, array, NaiveMean<ArrowType>(array));
}

template <typename ArrowType>
class TestMeanKernelNumeric : public ComputeFixture, public TestBase {};

TYPED_TEST_CASE(TestMeanKernelNumeric, NumericArrowTypes);
TYPED_TEST(TestMeanKernelNumeric, SimpleMean) {
using ScalarType = typename TypeTraits<DoubleType>::ScalarType;

ValidateMean<TypeParam>(&this->ctx_, "[]",
Datum(std::make_shared<ScalarType>(0.0, false)));

ValidateMean<TypeParam>(&this->ctx_, "[null]",
Datum(std::make_shared<ScalarType>(0.0, false)));

ValidateMean<TypeParam>(&this->ctx_, "[1, null, 1]",
Datum(std::make_shared<ScalarType>(1.0)));

ValidateMean<TypeParam>(&this->ctx_, "[1, 2, 3, 4, 5, 6, 7, 8]",
Datum(std::make_shared<ScalarType>(4.5)));

ValidateMean<TypeParam>(&this->ctx_, "[0, 0, 0, 0, 0, 0, 0, 0]",
Datum(std::make_shared<ScalarType>(0.0)));

ValidateMean<TypeParam>(&this->ctx_, "[1, 1, 1, 1, 1, 1, 1, 1]",
Datum(std::make_shared<ScalarType>(1.0)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For self-documenting code you could do:

auto null = std::make_shared<ScalarType>(0.0, false);

Also keep in mind that Datum has implicit ctors, so these Datum(...) are unneeded

}

template <typename ArrowType>
class TestRandomNumericMeanKernel : public ComputeFixture, public TestBase {};

TYPED_TEST_CASE(TestRandomNumericMeanKernel, NumericArrowTypes);
TYPED_TEST(TestRandomNumericMeanKernel, RandomArrayMean) {
auto rand = random::RandomArrayGenerator(0x8afc055);
for (size_t i = 3; i < 14; i++) {
for (auto null_probability : {0.0, 0.01, 0.1, 0.25, 0.5, 1.0}) {
for (auto length_adjust : {-2, -1, 0, 1, 2}) {
int64_t length = (1UL << i) + length_adjust;
auto array = rand.Numeric<TypeParam>(length, 0, 100, null_probability);
ValidateMean<TypeParam>(&this->ctx_, *array);
}
}
}
}

} // namespace compute
} // namespace arrow
115 changes: 115 additions & 0 deletions cpp/src/arrow/compute/kernels/mean.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/compute/kernels/mean.h"

#include <algorithm>

#include "arrow/compute/kernels/sum-internal.h"

namespace arrow {
namespace compute {

template <typename ArrowType,
typename SumType = typename FindAccumulatorType<ArrowType>::Type>
struct MeanState {
using ThisType = MeanState<ArrowType, SumType>;

ThisType operator+(const ThisType& rhs) const {
return ThisType(this->count + rhs.count, this->sum + rhs.sum);
}

ThisType& operator+=(const ThisType& rhs) {
this->count += rhs.count;
this->sum += rhs.sum;

return *this;
}

std::shared_ptr<Scalar> Finalize() const {
using ScalarType = typename TypeTraits<DoubleType>::ScalarType;

const bool is_valid = count > 0;
const double divisor = static_cast<double>(is_valid ? count : 1UL);
const double mean = static_cast<double>(sum) / divisor;

return std::make_shared<ScalarType>(mean, is_valid);
}

static std::shared_ptr<DataType> out_type() {
return TypeTraits<DoubleType>::type_singleton();
}

size_t count = 0;
typename SumType::c_type sum = 0;
};

#define MEAN_AGG_FN_CASE(T) \
case T::type_id: \
return std::static_pointer_cast<AggregateFunction>( \
std::make_shared<SumAggregateFunction<T, MeanState<T>>>());

std::shared_ptr<AggregateFunction> MakeMeanAggregateFunction(const DataType& type,
FunctionContext* ctx) {
switch (type.id()) {
MEAN_AGG_FN_CASE(UInt8Type);
MEAN_AGG_FN_CASE(Int8Type);
MEAN_AGG_FN_CASE(UInt16Type);
MEAN_AGG_FN_CASE(Int16Type);
MEAN_AGG_FN_CASE(UInt32Type);
MEAN_AGG_FN_CASE(Int32Type);
MEAN_AGG_FN_CASE(UInt64Type);
MEAN_AGG_FN_CASE(Int64Type);
MEAN_AGG_FN_CASE(FloatType);
MEAN_AGG_FN_CASE(DoubleType);
default:
return nullptr;
}

#undef MEAN_AGG_FN_CASE
}

static Status GetMeanKernel(FunctionContext* ctx, const DataType& type,
std::shared_ptr<AggregateUnaryKernel>& kernel) {
std::shared_ptr<AggregateFunction> aggregate = MakeMeanAggregateFunction(type, ctx);
if (!aggregate) return Status::Invalid("No mean for type ", type);

kernel = std::make_shared<AggregateUnaryKernel>(aggregate);

return Status::OK();
}

Status Mean(FunctionContext* ctx, const Datum& value, Datum* out) {
std::shared_ptr<AggregateUnaryKernel> kernel;

auto data_type = value.type();
if (data_type == nullptr)
return Status::Invalid("Datum must be array-like");
else if (!is_integer(data_type->id()) && !is_floating(data_type->id()))
return Status::Invalid("Datum must contain a NumericType");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to always use braces


RETURN_NOT_OK(GetMeanKernel(ctx, *data_type, kernel));

return kernel->Call(ctx, value, out);
}

Status Mean(FunctionContext* ctx, const Array& array, Datum* out) {
return Mean(ctx, array.data(), out);
}

} // namespace compute
} // namespace arrow
Loading