Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 87 additions & 13 deletions cpp/src/arrow/compute/kernels/aggregate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1058,23 +1058,57 @@ TYPED_TEST(TestNumericVarStdKernel, Basics) {
this->AssertVarStdIsInvalid(chunks, options);
}

class TestVarStdKernelStability : public TestPrimitiveVarStdKernel<DoubleType> {};

// Test numerical stability
TEST_F(TestVarStdKernelStability, Basics) {
template <typename ArrowType>
class TestVarStdKernelStability : public TestPrimitiveVarStdKernel<ArrowType> {};

typedef ::testing::Types<Int32Type, UInt32Type, Int64Type, UInt64Type, DoubleType>
VarStdStabilityTypes;

TYPED_TEST_SUITE(TestVarStdKernelStability, VarStdStabilityTypes);
TYPED_TEST(TestVarStdKernelStability, Basics) {
VarianceOptions options{1}; // ddof = 1
this->AssertVarStdIs("[100000004, 100000007, 100000013, 100000016]", options, 30.0);
this->AssertVarStdIs("[1000000004, 1000000007, 1000000013, 1000000016]", options, 30.0);
if (!is_unsigned_integer_type<TypeParam>::value) {
this->AssertVarStdIs("[-1000000016, -1000000013, -1000000007, -1000000004]", options,
30.0);
}
}

// Test numerical stability of variance merging code
class TestVarStdKernelMergeStability : public TestPrimitiveVarStdKernel<DoubleType> {};

TEST_F(TestVarStdKernelMergeStability, Basics) {
VarianceOptions options{1}; // ddof = 1

#ifndef __MINGW32__ // MinGW has precision issues
// This test is to make sure our variance combining method is stable.
// XXX: The reference value from numpy is actually wrong due to floating
// point limits. The correct result should equals variance(90, 0) = 4050.
std::vector<std::string> chunks = {"[40000008000000490]", "[40000008000000400]"};
this->AssertVarStdIs(chunks, options, 3904.0);
#endif
}

// Test integer arithmetic code
class TestVarStdKernelInt32 : public TestPrimitiveVarStdKernel<Int32Type> {};

TEST_F(TestVarStdKernelInt32, Basics) {
VarianceOptions options{1};
this->AssertVarStdIs("[-2147483648, -2147483647, -2147483646]", options, 1.0);
this->AssertVarStdIs("[2147483645, 2147483646, 2147483647]", options, 1.0);
this->AssertVarStdIs("[-2147483648, -2147483648, 2147483647]", options,
6.148914688373205e+18);
}

class TestVarStdKernelUInt32 : public TestPrimitiveVarStdKernel<UInt32Type> {};

TEST_F(TestVarStdKernelUInt32, Basics) {
VarianceOptions options{1};
this->AssertVarStdIs("[4294967293, 4294967294, 4294967295]", options, 1.0);
this->AssertVarStdIs("[0, 0, 4294967295]", options, 6.148914688373205e+18);
}

// https://en.wikipedia.org/wiki/Kahan_summation_algorithm
void KahanSum(double& sum, double& adjust, double addend) {
double y = addend - adjust;
Expand All @@ -1085,35 +1119,51 @@ void KahanSum(double& sum, double& adjust, double addend) {

// Calculate reference variance with Welford's online algorithm + Kahan summation
// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
std::pair<double, double> WelfordVar(const Array& array) {
const auto& array_numeric = reinterpret_cast<const DoubleArray&>(array);
const auto values = array_numeric.raw_values();
// XXX: not stable for long array with very small `stddev / average`
template <typename ArrayType>
std::pair<double, double> WelfordVar(const ArrayType& array) {
const auto values = array.raw_values();
internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), array.length());
double count = 0, mean = 0, m2 = 0;
double mean_adjust = 0, m2_adjust = 0;
for (int64_t i = 0; i < array.length(); ++i) {
if (reader.IsSet()) {
++count;
double delta = values[i] - mean;
double delta = static_cast<double>(values[i]) - mean;
KahanSum(mean, mean_adjust, delta / count);
double delta2 = values[i] - mean;
double delta2 = static_cast<double>(values[i]) - mean;
KahanSum(m2, m2_adjust, delta * delta2);
}
reader.Next();
}
return std::make_pair(m2 / count, m2 / (count - 1));
}

class TestVarStdKernelRandom : public TestPrimitiveVarStdKernel<DoubleType> {};
// Test random chunked array
template <typename ArrowType>
class TestVarStdKernelRandom : public TestPrimitiveVarStdKernel<ArrowType> {};

typedef ::testing::Types<Int32Type, UInt32Type, Int64Type, UInt64Type, FloatType,
DoubleType>
VarStdRandomTypes;

TEST_F(TestVarStdKernelRandom, Basics) {
TYPED_TEST_SUITE(TestVarStdKernelRandom, VarStdRandomTypes);
TYPED_TEST(TestVarStdKernelRandom, Basics) {
// Cut array into small chunks
constexpr int array_size = 5000;
constexpr int chunk_size_max = 50;
constexpr int chunk_count = array_size / chunk_size_max;

std::shared_ptr<Array> array;
auto rand = random::RandomArrayGenerator(0x5487656);
auto array = rand.Numeric<DoubleType>(array_size, -10000.0, 100000.0, 0.1);
if (is_floating_type<TypeParam>::value) {
array = rand.Numeric<TypeParam>(array_size, -10000.0, 100000.0, 0.1);
} else {
using CType = typename TypeParam::c_type;
constexpr CType min = std::numeric_limits<CType>::min();
constexpr CType max = std::numeric_limits<CType>::max();
array = rand.Numeric<TypeParam>(array_size, min, max, 0.1);
}
auto chunk_size_array = rand.Numeric<Int32Type>(chunk_count, 0, chunk_size_max);
const int* chunk_size = chunk_size_array->data()->GetValues<int>(1);
int total_size = 0;
Expand All @@ -1126,11 +1176,35 @@ TEST_F(TestVarStdKernelRandom, Basics) {
auto chunked = *ChunkedArray::Make(array_vector);

double var_population, var_sample;
std::tie(var_population, var_sample) = WelfordVar(*(array->Slice(0, total_size)));
using ArrayType = typename TypeTraits<TypeParam>::ArrayType;
auto typed_array = std::static_pointer_cast<ArrayType>(array->Slice(0, total_size));
std::tie(var_population, var_sample) = WelfordVar(*typed_array);

this->AssertVarStdIs(chunked, VarianceOptions{0}, var_population);
this->AssertVarStdIs(chunked, VarianceOptions{1}, var_sample);
}

// This test is too heavy to run in CI, should be checked manually
#if 0
class TestVarStdKernelIntegerLength : public TestPrimitiveVarStdKernel<Int32Type> {};

TEST_F(TestVarStdKernelIntegerLength, Basics) {
constexpr int32_t min = std::numeric_limits<int32_t>::min();
constexpr int32_t max = std::numeric_limits<int32_t>::max();
auto rand = random::RandomArrayGenerator(0x5487657);
// large data volume
auto array = rand.Numeric<Int32Type>(4000000000, min, max, 0.1);
// biased distribution
// auto array = rand.Numeric<Int32Type>(4000000000, min, min + 100000, 0.1);

double var_population, var_sample;
auto int32_array = std::static_pointer_cast<Int32Array>(array);
std::tie(var_population, var_sample) = WelfordVar(*int32_array);

this->AssertVarStdIs(*array, VarianceOptions{0}, var_population);
this->AssertVarStdIs(*array, VarianceOptions{1}, var_sample);
}
#endif

} // namespace compute
} // namespace arrow
70 changes: 62 additions & 8 deletions cpp/src/arrow/compute/kernels/aggregate_var_std.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,43 @@
// under the License.

#include "arrow/compute/kernels/aggregate_basic_internal.h"
#include "arrow/util/int128_internal.h"

namespace arrow {
namespace compute {
namespace aggregate {

namespace {

using arrow::internal::int128_t;

template <typename ArrowType>
struct VarStdState {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
using c_type = typename ArrowType::c_type;
using CType = typename ArrowType::c_type;
using ThisType = VarStdState<ArrowType>;

// Calculate `m2` (sum((X-mean)^2)) of one chunk with `two pass algorithm`
// float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Two-pass_algorithm
// Always use `double` to calculate variance for any array type
void Consume(const ArrayType& array) {
template <typename T = ArrowType>
enable_if_t<is_floating_type<T>::value || (sizeof(CType) > 4)> Consume(
const ArrayType& array) {
int64_t count = array.length() - array.null_count();
if (count == 0) {
return;
}

double sum = 0;
using SumType =
typename std::conditional<is_floating_type<T>::value, double, int128_t>::type;
SumType sum = 0;
VisitArrayDataInline<ArrowType>(
*array.data(), [&sum](c_type value) { sum += static_cast<double>(value); },
*array.data(), [&sum](CType value) { sum += static_cast<SumType>(value); },
[]() {});

double mean = sum / count, m2 = 0;
double mean = static_cast<double>(sum) / count, m2 = 0;
VisitArrayDataInline<ArrowType>(
*array.data(),
[mean, &m2](c_type value) {
[mean, &m2](CType value) {
double v = static_cast<double>(value);
m2 += (v - mean) * (v - mean);
},
Expand All @@ -57,6 +63,54 @@ struct VarStdState {
this->m2 = m2;
}

// int32/16/8: textbook one pass algorithm with integer arithmetic
template <typename T = ArrowType>
enable_if_t<is_integer_type<T>::value && (sizeof(CType) <= 4)> Consume(
const ArrayType& array) {
// max number of elements that sum will not overflow int64 (2Gi int32 elements)
// for uint32: 0 <= sum < 2^63 (int64 >= 0)
// for int32: -2^62 <= sum < 2^62
constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8);

int64_t start_index = 0;
int64_t valid_count = array.length() - array.null_count();

while (valid_count > 0) {
// process in chunks that overflow will never happen
const auto slice = array.Slice(start_index, max_length);
const int64_t count = slice->length() - slice->null_count();
start_index += max_length;
valid_count -= count;

if (count > 0) {
int64_t sum = 0;
int128_t square_sum = 0;
VisitArrayDataInline<ArrowType>(
*slice->data(),
[&sum, &square_sum](CType value) {
sum += value;
square_sum += static_cast<uint64_t>(value) * value;
},
[]() {});

const double mean = static_cast<double>(sum) / count;
// calculate m2 = square_sum - sum * sum / count
// decompose `sum * sum / count` into integers and fractions
const int128_t sum_square = static_cast<int128_t>(sum) * sum;
const int128_t integers = sum_square / count;
const double fractions = static_cast<double>(sum_square % count) / count;
const double m2 = static_cast<double>(square_sum - integers) - fractions;

// merge variance
ThisType state;
state.count = count;
state.mean = mean;
state.m2 = m2;
this->MergeFrom(state);
}
}
}

// Combine `m2` from two chunks (m2 = n*s2)
// https://www.emathzone.com/tutorials/basic-statistics/combined-variance.html
void MergeFrom(const ThisType& state) {
Expand Down