diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 94eba0ce1a6..3c6a3990cf2 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -147,6 +147,7 @@ if(ARROW_COMPUTE) compute/kernels/boolean.cc compute/kernels/cast.cc compute/kernels/hash.cc + compute/kernels/mean.cc compute/kernels/sum.cc compute/kernels/util-internal.cc) endif() diff --git a/cpp/src/arrow/compute/kernels/aggregate-test.cc b/cpp/src/arrow/compute/kernels/aggregate-test.cc index ca4474461c9..bdf50f5ac7b 100644 --- a/cpp/src/arrow/compute/kernels/aggregate-test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate-test.cc @@ -15,13 +15,18 @@ // specific language governing permissions and limitations // under the License. +#include +#include #include #include +#include #include #include "arrow/array.h" #include "arrow/compute/kernel.h" +#include "arrow/compute/kernels/mean.h" +#include "arrow/compute/kernels/sum-internal.h" #include "arrow/compute/kernels/sum.h" #include "arrow/compute/test-util.h" #include "arrow/type.h" @@ -38,63 +43,16 @@ using std::vector; namespace arrow { namespace compute { -template -struct DatumEqual { - static void EnsureEqual(const Datum& lhs, const Datum& rhs) {} -}; - -template -struct DatumEqual::Value>::type> { - static constexpr double kArbitraryDoubleErrorBound = 1.0; - using ScalarType = typename TypeTraits::ScalarType; - - static void EnsureEqual(const Datum& lhs, const Datum& rhs) { - ASSERT_EQ(lhs.kind(), rhs.kind()); - if (lhs.kind() == Datum::SCALAR) { - auto left = static_cast(lhs.scalar().get()); - auto right = static_cast(rhs.scalar().get()); - ASSERT_EQ(left->type->id(), right->type->id()); - ASSERT_NEAR(left->value, right->value, kArbitraryDoubleErrorBound); - } - } -}; - -template -struct DatumEqual::value>::type> { - using ScalarType = typename TypeTraits::ScalarType; - static void EnsureEqual(const Datum& lhs, const Datum& rhs) { - ASSERT_EQ(lhs.kind(), rhs.kind()); - if (lhs.kind() == Datum::SCALAR) { - auto left = static_cast(lhs.scalar().get()); - auto right = static_cast(rhs.scalar().get()); - ASSERT_EQ(left->type->id(), right->type->id()); - ASSERT_EQ(left->value, right->value); - } - } -}; - template -void ValidateSum(FunctionContext* ctx, const Array& input, Datum expected) { - using OutputType = typename FindAccumulatorType::Type; - Datum result; - ASSERT_OK(Sum(ctx, input, &result)); - DatumEqual::EnsureEqual(result, expected); -} +using SumResult = + std::pair::Type::c_type, size_t>; template -void ValidateSum(FunctionContext* ctx, const char* json, Datum expected) { - auto array = ArrayFromJSON(TypeTraits::type_singleton(), json); - ValidateSum(ctx, *array, expected); -} - -template -static Datum DummySum(const Array& array) { +static SumResult NaiveSumPartial(const Array& array) { using ArrayType = typename TypeTraits::ArrayType; - using SumType = typename FindAccumulatorType::Type; - using SumScalarType = typename TypeTraits::ScalarType; + using ResultType = SumResult; - typename SumType::c_type sum = 0; - int64_t count = 0; + ResultType result; auto data = array.data(); internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), array.length()); @@ -102,30 +60,52 @@ static Datum DummySum(const Array& array) { const auto values = array_numeric.raw_values(); for (int64_t i = 0; i < array.length(); i++) { if (reader.IsSet()) { - sum += values[i]; - count++; + result.first += values[i]; + result.second++; } reader.Next(); } - if (count > 0) { - return Datum(std::make_shared(sum)); - } else { - return Datum(std::make_shared(0, false)); - } + return result; +} + +template +static Datum NaiveSum(const Array& array) { + using SumType = typename FindAccumulatorType::Type; + using SumScalarType = typename TypeTraits::ScalarType; + + auto result = NaiveSumPartial(array); + bool is_valid = result.second > 0; + + return Datum(std::make_shared(result.first, is_valid)); +} + +template +void ValidateSum(FunctionContext* ctx, const Array& input, Datum expected) { + using OutputType = typename FindAccumulatorType::Type; + + Datum result; + ASSERT_OK(Sum(ctx, input, &result)); + DatumEqual::EnsureEqual(result, expected); +} + +template +void ValidateSum(FunctionContext* ctx, const char* json, Datum expected) { + auto array = ArrayFromJSON(TypeTraits::type_singleton(), json); + ValidateSum(ctx, *array, expected); } template void ValidateSum(FunctionContext* ctx, const Array& array) { - ValidateSum(ctx, array, DummySum(array)); + ValidateSum(ctx, array, NaiveSum(array)); } template -class TestSumKernelNumeric : public ComputeFixture, public TestBase {}; +class TestNumericSumKernel : public ComputeFixture, public TestBase {}; -TYPED_TEST_CASE(TestSumKernelNumeric, NumericArrowTypes); -TYPED_TEST(TestSumKernelNumeric, SimpleSum) { +TYPED_TEST_CASE(TestNumericSumKernel, NumericArrowTypes); +TYPED_TEST(TestNumericSumKernel, SimpleSum) { using SumType = typename FindAccumulatorType::Type; using ScalarType = typename TypeTraits::ScalarType; using T = typename TypeParam::c_type; @@ -145,10 +125,10 @@ TYPED_TEST(TestSumKernelNumeric, SimpleSum) { } template -class TestRandomSumKernelNumeric : public ComputeFixture, public TestBase {}; +class TestRandomNumericSumKernel : public ComputeFixture, public TestBase {}; -TYPED_TEST_CASE(TestRandomSumKernelNumeric, NumericArrowTypes); -TYPED_TEST(TestRandomSumKernelNumeric, RandomArraySum) { +TYPED_TEST_CASE(TestRandomNumericSumKernel, NumericArrowTypes); +TYPED_TEST(TestRandomNumericSumKernel, RandomArraySum) { auto rand = random::RandomArrayGenerator(0x5487655); for (size_t i = 3; i < 14; i++) { for (auto null_probability : {0.0, 0.01, 0.1, 0.25, 0.5, 1.0}) { @@ -161,7 +141,7 @@ TYPED_TEST(TestRandomSumKernelNumeric, RandomArraySum) { } } -TYPED_TEST(TestRandomSumKernelNumeric, RandomSliceArraySum) { +TYPED_TEST(TestRandomNumericSumKernel, RandomSliceArraySum) { auto arithmetic = ArrayFromJSON(TypeTraits::type_singleton(), "[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]"); ValidateSum(&this->ctx_, *arithmetic); @@ -175,12 +155,87 @@ TYPED_TEST(TestRandomSumKernelNumeric, RandomSliceArraySum) { const int64_t length = 1U << 6; auto array = rand.Numeric(length, 0, 10, 0.5); for (size_t i = 1; i < 16; i++) { - for (size_t j = 1; i < 16; i++) { + for (size_t j = 1; j < 16; j++) { auto slice = array->Slice(i, length - j); ValidateSum(&this->ctx_, *slice); } } } +template +static Datum NaiveMean(const Array& array) { + using MeanScalarType = typename TypeTraits::ScalarType; + + const auto result = NaiveSumPartial(array); + const double mean = static_cast(result.first) / + static_cast(result.second ? result.second : 1UL); + const bool is_valid = result.second > 0; + + return Datum(std::make_shared(mean, is_valid)); +} + +template +void ValidateMean(FunctionContext* ctx, const Array& input, Datum expected) { + using OutputType = typename FindAccumulatorType::Type; + + Datum result; + ASSERT_OK(Mean(ctx, input, &result)); + DatumEqual::EnsureEqual(result, expected); +} + +template +void ValidateMean(FunctionContext* ctx, const char* json, Datum expected) { + auto array = ArrayFromJSON(TypeTraits::type_singleton(), json); + ValidateMean(ctx, *array, expected); +} + +template +void ValidateMean(FunctionContext* ctx, const Array& array) { + ValidateMean(ctx, array, NaiveMean(array)); +} + +template +class TestMeanKernelNumeric : public ComputeFixture, public TestBase {}; + +TYPED_TEST_CASE(TestMeanKernelNumeric, NumericArrowTypes); +TYPED_TEST(TestMeanKernelNumeric, SimpleMean) { + using ScalarType = typename TypeTraits::ScalarType; + + ValidateMean(&this->ctx_, "[]", + Datum(std::make_shared(0.0, false))); + + ValidateMean(&this->ctx_, "[null]", + Datum(std::make_shared(0.0, false))); + + ValidateMean(&this->ctx_, "[1, null, 1]", + Datum(std::make_shared(1.0))); + + ValidateMean(&this->ctx_, "[1, 2, 3, 4, 5, 6, 7, 8]", + Datum(std::make_shared(4.5))); + + ValidateMean(&this->ctx_, "[0, 0, 0, 0, 0, 0, 0, 0]", + Datum(std::make_shared(0.0))); + + ValidateMean(&this->ctx_, "[1, 1, 1, 1, 1, 1, 1, 1]", + Datum(std::make_shared(1.0))); +} + +template +class TestRandomNumericMeanKernel : public ComputeFixture, public TestBase {}; + +TYPED_TEST_CASE(TestRandomNumericMeanKernel, NumericArrowTypes); +TYPED_TEST(TestRandomNumericMeanKernel, RandomArrayMean) { + auto rand = random::RandomArrayGenerator(0x8afc055); + for (size_t i = 3; i < 14; i++) { + for (auto null_probability : {0.0, 0.01, 0.1, 0.25, 0.5, 1.0}) { + for (auto length_adjust : {-2, -1, 0, 1, 2}) { + int64_t length = (1UL << i) + length_adjust; + auto array = rand.Numeric(length, 0, 100, null_probability); + ValidateMean(&this->ctx_, *array); + } + } + } +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/mean.cc b/cpp/src/arrow/compute/kernels/mean.cc new file mode 100644 index 00000000000..d1eaf15682a --- /dev/null +++ b/cpp/src/arrow/compute/kernels/mean.cc @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/kernels/mean.h" + +#include + +#include "arrow/compute/kernels/sum-internal.h" + +namespace arrow { +namespace compute { + +template ::Type> +struct MeanState { + using ThisType = MeanState; + + ThisType operator+(const ThisType& rhs) const { + return ThisType(this->count + rhs.count, this->sum + rhs.sum); + } + + ThisType& operator+=(const ThisType& rhs) { + this->count += rhs.count; + this->sum += rhs.sum; + + return *this; + } + + std::shared_ptr Finalize() const { + using ScalarType = typename TypeTraits::ScalarType; + + const bool is_valid = count > 0; + const double divisor = static_cast(is_valid ? count : 1UL); + const double mean = static_cast(sum) / divisor; + + return std::make_shared(mean, is_valid); + } + + static std::shared_ptr out_type() { + return TypeTraits::type_singleton(); + } + + size_t count = 0; + typename SumType::c_type sum = 0; +}; + +#define MEAN_AGG_FN_CASE(T) \ + case T::type_id: \ + return std::static_pointer_cast( \ + std::make_shared>>()); + +std::shared_ptr MakeMeanAggregateFunction(const DataType& type, + FunctionContext* ctx) { + switch (type.id()) { + MEAN_AGG_FN_CASE(UInt8Type); + MEAN_AGG_FN_CASE(Int8Type); + MEAN_AGG_FN_CASE(UInt16Type); + MEAN_AGG_FN_CASE(Int16Type); + MEAN_AGG_FN_CASE(UInt32Type); + MEAN_AGG_FN_CASE(Int32Type); + MEAN_AGG_FN_CASE(UInt64Type); + MEAN_AGG_FN_CASE(Int64Type); + MEAN_AGG_FN_CASE(FloatType); + MEAN_AGG_FN_CASE(DoubleType); + default: + return nullptr; + } + +#undef MEAN_AGG_FN_CASE +} + +static Status GetMeanKernel(FunctionContext* ctx, const DataType& type, + std::shared_ptr& kernel) { + std::shared_ptr aggregate = MakeMeanAggregateFunction(type, ctx); + if (!aggregate) return Status::Invalid("No mean for type ", type); + + kernel = std::make_shared(aggregate); + + return Status::OK(); +} + +Status Mean(FunctionContext* ctx, const Datum& value, Datum* out) { + std::shared_ptr kernel; + + auto data_type = value.type(); + if (data_type == nullptr) + return Status::Invalid("Datum must be array-like"); + else if (!is_integer(data_type->id()) && !is_floating(data_type->id())) + return Status::Invalid("Datum must contain a NumericType"); + + RETURN_NOT_OK(GetMeanKernel(ctx, *data_type, kernel)); + + return kernel->Call(ctx, value, out); +} + +Status Mean(FunctionContext* ctx, const Array& array, Datum* out) { + return Mean(ctx, array.data(), out); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/mean.h b/cpp/src/arrow/compute/kernels/mean.h new file mode 100644 index 00000000000..5074d4e7b7d --- /dev/null +++ b/cpp/src/arrow/compute/kernels/mean.h @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/type_traits.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Array; +class DataType; + +namespace compute { + +struct Datum; +class FunctionContext; +class AggregateFunction; + +ARROW_EXPORT +std::shared_ptr MakeMeanAggregateFunction(const DataType& type, + FunctionContext* context); + +/// \brief Compute the mean of a numeric array. +/// +/// \param[in] context the FunctionContext +/// \param[in] value datum to compute the mean, expecting Array +/// \param[out] mean datum of the computed mean as a DoubleScalar +/// +/// \since 0.13.0 +/// \note API not yet finalized +ARROW_EXPORT +Status Mean(FunctionContext* context, const Datum& value, Datum* mean); + +/// \brief Compute the mean of a numeric array. +/// +/// \param[in] context the FunctionContext +/// \param[in] array to compute the mean +/// \param[out] mean datum of the computed mean as a DoubleScalar +/// +/// \since 0.13.0 +/// \note API not yet finalized +ARROW_EXPORT +Status Mean(FunctionContext* context, const Array& array, Datum* mean); + +} // namespace compute +}; // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/sum-internal.h b/cpp/src/arrow/compute/kernels/sum-internal.h new file mode 100644 index 00000000000..a4e7ea63439 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/sum-internal.h @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "arrow/compute/kernel.h" +#include "arrow/compute/kernels/aggregate.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit-util.h" +#include "arrow/util/logging.h" + +namespace arrow { + +class Array; +class DataType; + +namespace compute { + +// Find the largest compatible primitive type for a primitive type. +template +struct FindAccumulatorType {}; + +template +struct FindAccumulatorType> { + using Type = Int64Type; +}; + +template +struct FindAccumulatorType> { + using Type = UInt64Type; +}; + +template +struct FindAccumulatorType> { + using Type = DoubleType; +}; + +template +class SumAggregateFunction final : public AggregateFunctionStaticState { + using CType = typename TypeTraits::CType; + using ArrayType = typename TypeTraits::ArrayType; + + // A small number of elements rounded to the next cacheline. This should + // amount to a maximum of 4 cachelines when dealing with 8 bytes elements. + static constexpr int64_t kTinyThreshold = 32; + static_assert(kTinyThreshold >= (2 * CHAR_BIT) + 1, + "ConsumeSparse requires 3 bytes of null bitmap, and 17 is the" + "required minimum number of bits/elements to cover 3 bytes."); + + public: + Status Consume(const Array& input, StateType* state) const override { + const ArrayType& array = static_cast(input); + + if (input.null_count() == 0) { + *state = ConsumeDense(array); + } else if (input.length() <= kTinyThreshold) { + // In order to simplify ConsumeSparse implementation (requires at least 3 + // bytes of bitmap data), small arrays are handled differently. + *state = ConsumeTiny(array); + } else { + *state = ConsumeSparse(array); + } + + return Status::OK(); + } + + Status Merge(const StateType& src, StateType* dst) const override { + *dst += src; + return Status::OK(); + } + + Status Finalize(const StateType& src, Datum* output) const override { + *output = src.Finalize(); + return Status::OK(); + } + + std::shared_ptr out_type() const override { return StateType::out_type(); } + + private: + StateType ConsumeDense(const ArrayType& array) const { + StateType local; + + const auto values = array.raw_values(); + const int64_t length = array.length(); + for (int64_t i = 0; i < length; i++) { + local.sum += values[i]; + } + + local.count = length; + + return local; + } + + StateType ConsumeTiny(const ArrayType& array) const { + StateType local; + + internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), + array.length()); + const auto values = array.raw_values(); + for (int64_t i = 0; i < array.length(); i++) { + if (reader.IsSet()) { + local.sum += values[i]; + local.count++; + } + reader.Next(); + } + + return local; + } + + // While this is not branchless, gcc needs this to be in a different function + // for it to generate cmov which ends to be slightly faster than + // multiplication but safe for handling NaN with doubles. + inline CType MaskedValue(bool valid, CType value) const { return valid ? value : 0; } + + inline StateType UnrolledSum(uint8_t bits, const CType* values) const { + StateType local; + + if (bits < 0xFF) { + // Some nulls + for (size_t i = 0; i < 8; i++) { + local.sum += MaskedValue(bits & (1U << i), values[i]); + } + local.count += BitUtil::kBytePopcount[bits]; + } else { + // No nulls + for (size_t i = 0; i < 8; i++) { + local.sum += values[i]; + } + local.count += 8; + } + + return local; + } + + StateType ConsumeSparse(const ArrayType& array) const { + StateType local; + + // Sliced bitmaps on non-byte positions induce problem with the branchless + // unrolled technique. Thus extra padding is added on both left and right + // side of the slice such that both ends are byte-aligned. The first and + // last bitmap are properly masked to ignore extra values induced by + // padding. + // + // The execution is divided in 3 sections. + // + // 1. Compute the sum of the first masked byte. + // 2. Compute the sum of the middle bytes + // 3. Compute the sum of the last masked byte. + + const int64_t length = array.length(); + const int64_t offset = array.offset(); + + // The number of bytes covering the range, this includes partial bytes. + // This number bounded by `<= (length / 8) + 2`, e.g. a possible extra byte + // on the left, and on the right. + const int64_t covering_bytes = BitUtil::CoveringBytes(offset, length); + DCHECK_GE(covering_bytes, 3); + + // Align values to the first batch of 8 elements. Note that raw_values() is + // already adjusted with the offset, thus we rewind a little to align to + // the closest 8-batch offset. + const auto values = array.raw_values() - (offset % 8); + + // Align bitmap at the first consumable byte. + const auto bitmap = array.null_bitmap_data() + BitUtil::RoundDown(offset, 8) / 8; + + // Consume the first (potentially partial) byte. + const uint8_t first_mask = BitUtil::kTrailingBitmask[offset % 8]; + local += UnrolledSum(bitmap[0] & first_mask, values); + + // Consume the (full) middle bytes. The loop iterates in unit of + // batches of 8 values and 1 byte of bitmap. + for (int64_t i = 1; i < covering_bytes - 1; i++) { + local += UnrolledSum(bitmap[i], &values[i * 8]); + } + + // Consume the last (potentially partial) byte. + const int64_t last_idx = covering_bytes - 1; + const uint8_t last_mask = BitUtil::kPrecedingWrappingBitmask[(offset + length) % 8]; + local += UnrolledSum(bitmap[last_idx] & last_mask, &values[last_idx * 8]); + + return local; + } +}; // namespace compute + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/sum.cc b/cpp/src/arrow/compute/kernels/sum.cc index 17999411efd..14b999ccb98 100644 --- a/cpp/src/arrow/compute/kernels/sum.cc +++ b/cpp/src/arrow/compute/kernels/sum.cc @@ -16,14 +16,7 @@ // under the License. #include "arrow/compute/kernels/sum.h" - -#include "arrow/array.h" -#include "arrow/compute/kernel.h" -#include "arrow/compute/kernels/aggregate.h" -#include "arrow/type_traits.h" -#include "arrow/util/bit-util.h" -#include "arrow/util/logging.h" -#include "arrow/visitor_inline.h" +#include "arrow/compute/kernels/sum-internal.h" namespace arrow { namespace compute { @@ -44,184 +37,30 @@ struct SumState { return *this; } - std::shared_ptr AsScalar() const { + std::shared_ptr Finalize() const { using ScalarType = typename TypeTraits::ScalarType; - return std::make_shared(this->sum); - } - - size_t count = 0; - typename SumType::c_type sum = 0; -}; - -constexpr int64_t CoveringBytes(int64_t offset, int64_t length) { - return (BitUtil::RoundUp(length + offset, 8) - BitUtil::RoundDown(offset, 8)) / 8; -} - -static_assert(CoveringBytes(0, 8) == 1, ""); -static_assert(CoveringBytes(0, 9) == 2, ""); -static_assert(CoveringBytes(1, 7) == 1, ""); -static_assert(CoveringBytes(1, 8) == 2, ""); -static_assert(CoveringBytes(2, 19) == 3, ""); -static_assert(CoveringBytes(7, 18) == 4, ""); - -template > -class SumAggregateFunction final : public AggregateFunctionStaticState { - using CType = typename TypeTraits::CType; - using ArrayType = typename TypeTraits::ArrayType; - - static constexpr int64_t kTinyThreshold = 32; - static_assert(kTinyThreshold > 18, - "ConsumeSparse requires at least 18 elements to fit 3 bytes"); - - public: - Status Consume(const Array& input, StateType* state) const override { - const ArrayType& array = static_cast(input); - if (input.null_count() == 0) { - *state = ConsumeDense(array); - } else if (input.length() <= kTinyThreshold) { - // In order to simplify ConsumeSparse implementation (requires at least 3 - // bytes of bitmap data), small arrays are handled differently. - *state = ConsumeTiny(array); - } else { - *state = ConsumeSparse(array); - } - - return Status::OK(); - } - - Status Merge(const StateType& src, StateType* dst) const override { - *dst += src; - return Status::OK(); - } - - Status Finalize(const StateType& src, Datum* output) const override { - auto boxed = src.AsScalar(); - if (src.count == 0) { + auto boxed = std::make_shared(this->sum); + if (count == 0) { // TODO(wesm): Currently null, but fix this boxed->is_valid = false; } - *output = boxed; - return Status::OK(); - } - - std::shared_ptr out_type() const override { - return TypeTraits::Type>::type_singleton(); - } - - private: - StateType ConsumeDense(const ArrayType& array) const { - StateType local; - - const auto values = array.raw_values(); - const int64_t length = array.length(); - for (int64_t i = 0; i < length; i++) { - local.sum += values[i]; - } - - local.count = length; - - return local; - } - - StateType ConsumeTiny(const ArrayType& array) const { - StateType local; - - internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), - array.length()); - const auto values = array.raw_values(); - for (int64_t i = 0; i < array.length(); i++) { - if (reader.IsSet()) { - local.sum += values[i]; - local.count++; - } - reader.Next(); - } - return local; + return boxed; } - inline StateType UnrolledSum(uint8_t bits, const CType* values) const { - StateType local; - - if (bits < 0xFF) { -#define SUM_SHIFT(ITEM) values[ITEM] * static_cast(((bits >> ITEM) & 1U)) - // Some nulls - local.sum += SUM_SHIFT(0); - local.sum += SUM_SHIFT(1); - local.sum += SUM_SHIFT(2); - local.sum += SUM_SHIFT(3); - local.sum += SUM_SHIFT(4); - local.sum += SUM_SHIFT(5); - local.sum += SUM_SHIFT(6); - local.sum += SUM_SHIFT(7); - local.count += BitUtil::kBytePopcount[bits]; -#undef SUM_SHIFT - } else { - // No nulls - for (size_t i = 0; i < 8; i++) { - local.sum += values[i]; - } - local.count += 8; - } - - return local; + static std::shared_ptr out_type() { + return TypeTraits::type_singleton(); } - StateType ConsumeSparse(const ArrayType& array) const { - StateType local; - - // Sliced bitmaps on non-byte positions induce problem with the branchless - // unrolled technique. Thus extra padding is added on both left and right - // side of the slice such that both ends are byte-aligned. The first and - // last bitmap are properly masked to ignore extra values induced by - // padding. - // - // The execution is divided in 3 sections. - // - // 1. Compute the sum of the first masked byte. - // 2. Compute the sum of the middle bytes - // 3. Compute the sum of the last masked byte. - - const int64_t length = array.length(); - const int64_t offset = array.offset(); - - // The number of bytes covering the range, this includes partial bytes. - // This number bounded by `<= (length / 8) + 2`, e.g. a possible extra byte - // on the left, and on the right. - const int64_t covering_bytes = CoveringBytes(offset, length); - - // Align values to the first batch of 8 elements. Note that raw_values() is - // already adjusted with the offset, thus we rewind a little to align to - // the closest 8-batch offset. - const auto values = array.raw_values() - (offset % 8); - - // Align bitmap at the first consumable byte. - const auto bitmap = array.null_bitmap_data() + BitUtil::RoundDown(offset, 8) / 8; - - // Consume the first (potentially partial) byte. - const uint8_t first_mask = BitUtil::kTrailingBitmask[offset % 8]; - local += UnrolledSum(bitmap[0] & first_mask, values); - - // Consume the (full) middle bytes. The loop iterates in unit of - // batches of 8 values and 1 byte of bitmap. - for (int64_t i = 1; i < covering_bytes - 1; i++) { - local += UnrolledSum(bitmap[i], &values[i * 8]); - } - - // Consume the last (potentially partial) byte. - const int64_t last_idx = covering_bytes - 1; - const uint8_t last_mask = BitUtil::kPrecedingWrappingBitmask[(offset + length) % 8]; - local += UnrolledSum(bitmap[last_idx] & last_mask, &values[last_idx * 8]); - - return local; - } + size_t count = 0; + typename SumType::c_type sum = 0; }; #define SUM_AGG_FN_CASE(T) \ case T::type_id: \ return std::static_pointer_cast( \ - std::make_shared>()); + std::make_shared>>()); std::shared_ptr MakeSumAggregateFunction(const DataType& type, FunctionContext* ctx) { diff --git a/cpp/src/arrow/compute/kernels/sum.h b/cpp/src/arrow/compute/kernels/sum.h index 88da2acea88..e6f95490d7c 100644 --- a/cpp/src/arrow/compute/kernels/sum.h +++ b/cpp/src/arrow/compute/kernels/sum.h @@ -15,49 +15,31 @@ // specific language governing permissions and limitations // under the License. -#ifndef ARROW_COMPUTE_KERNELS_SUM_H -#define ARROW_COMPUTE_KERNELS_SUM_H +#pragma once #include -#include -#include "arrow/status.h" -#include "arrow/type.h" -#include "arrow/type_traits.h" #include "arrow/util/visibility.h" namespace arrow { class Array; class DataType; +class Status; namespace compute { -// Find the largest compatible primitive type for a primitive type. -template -struct FindAccumulatorType { - using Type = DoubleType; -}; - -template -struct FindAccumulatorType::value>::type> { - using Type = Int64Type; -}; - -template -struct FindAccumulatorType::value>::type> { - using Type = UInt64Type; -}; - -template -struct FindAccumulatorType::value>::type> { - using Type = DoubleType; -}; - struct Datum; class FunctionContext; class AggregateFunction; +/// \brief Return a Sum Kernel +/// +/// \param[in] type required to specialize the kernel +/// \param[in] context the FunctionContext +/// +/// \since 0.13.0 +/// \note API not yet finalized ARROW_EXPORT std::shared_ptr MakeSumAggregateFunction(const DataType& type, FunctionContext* context); @@ -86,5 +68,3 @@ Status Sum(FunctionContext* context, const Array& array, Datum* out); } // namespace compute } // namespace arrow - -#endif // ARROW_COMPUTE_KERNELS_CAST_H diff --git a/cpp/src/arrow/compute/test-util.h b/cpp/src/arrow/compute/test-util.h index e90a0343e59..bec54cc3615 100644 --- a/cpp/src/arrow/compute/test-util.h +++ b/cpp/src/arrow/compute/test-util.h @@ -69,6 +69,41 @@ std::shared_ptr _MakeArray(const std::shared_ptr& type, return result; } +template +struct DatumEqual {}; + +template +struct DatumEqual::value>::type> { + static constexpr double kArbitraryDoubleErrorBound = 1.0; + using ScalarType = typename TypeTraits::ScalarType; + + static void EnsureEqual(const Datum& lhs, const Datum& rhs) { + ASSERT_EQ(lhs.kind(), rhs.kind()); + if (lhs.kind() == Datum::SCALAR) { + auto left = internal::checked_cast(lhs.scalar().get()); + auto right = internal::checked_cast(rhs.scalar().get()); + ASSERT_EQ(left->is_valid, right->is_valid); + ASSERT_EQ(left->type->id(), right->type->id()); + ASSERT_NEAR(left->value, right->value, kArbitraryDoubleErrorBound); + } + } +}; + +template +struct DatumEqual::value>::type> { + using ScalarType = typename TypeTraits::ScalarType; + static void EnsureEqual(const Datum& lhs, const Datum& rhs) { + ASSERT_EQ(lhs.kind(), rhs.kind()); + if (lhs.kind() == Datum::SCALAR) { + auto left = internal::checked_cast(lhs.scalar().get()); + auto right = internal::checked_cast(rhs.scalar().get()); + ASSERT_EQ(left->is_valid, right->is_valid); + ASSERT_EQ(left->type->id(), right->type->id()); + ASSERT_EQ(left->value, right->value); + } + } +}; + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/util/bit-util-test.cc b/cpp/src/arrow/util/bit-util-test.cc index e8f32d27bdf..774d3bf6c37 100644 --- a/cpp/src/arrow/util/bit-util-test.cc +++ b/cpp/src/arrow/util/bit-util-test.cc @@ -750,6 +750,15 @@ TEST(BitUtil, RoundDown) { } } +TEST(BitUtil, CoveringBytes) { + EXPECT_EQ(BitUtil::CoveringBytes(0, 8), 1); + EXPECT_EQ(BitUtil::CoveringBytes(0, 9), 2); + EXPECT_EQ(BitUtil::CoveringBytes(1, 7), 1); + EXPECT_EQ(BitUtil::CoveringBytes(1, 8), 2); + EXPECT_EQ(BitUtil::CoveringBytes(2, 19), 3); + EXPECT_EQ(BitUtil::CoveringBytes(7, 18), 4); +} + TEST(BitUtil, TrailingBits) { EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 0), 0); EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 1), 1); diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h index 53b55883d9a..6724c293043 100644 --- a/cpp/src/arrow/util/bit-util.h +++ b/cpp/src/arrow/util/bit-util.h @@ -154,6 +154,21 @@ constexpr int64_t RoundUpToMultipleOf64(int64_t num) { return RoundUpToPowerOf2(num, 64); } +// Returns the number of bytes covering a sliced bitmap. Find the length +// rounded to cover full bytes on both extremities. +// +// The following example represents a slice (offset=10, length=9) +// +// 0 8 16 24 +// |-------|-------|------| +// [ ] (slice) +// [ ] (same slice aligned to bytes bounds, length=16) +// +// The covering bytes is the length (in bytes) of this new aligned slice. +constexpr int64_t CoveringBytes(int64_t offset, int64_t length) { + return (BitUtil::RoundUp(length + offset, 8) - BitUtil::RoundDown(offset, 8)) / 8; +} + // Returns the 'num_bits' least-significant bits of 'v'. static inline uint64_t TrailingBits(uint64_t v, int num_bits) { if (ARROW_PREDICT_FALSE(num_bits == 0)) return 0;