From ce6ea4570e3a2dab49199ee9e7899d0f0c1ce912 Mon Sep 17 00:00:00 2001 From: Yibo Cai Date: Mon, 12 Oct 2020 08:38:21 +0000 Subject: [PATCH] ARROW-10304: [C++][Compute] Optimize variance kernel for integers Improve variance kernel performance for integers by leveraging textbook one pass algorithm and integer arithmetic. --- .../arrow/compute/kernels/aggregate_test.cc | 100 +++++++++++++++--- .../compute/kernels/aggregate_var_std.cc | 70 ++++++++++-- 2 files changed, 149 insertions(+), 21 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index 6d97e79a23f..bcaa842fa7f 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -1058,16 +1058,31 @@ TYPED_TEST(TestNumericVarStdKernel, Basics) { this->AssertVarStdIsInvalid(chunks, options); } -class TestVarStdKernelStability : public TestPrimitiveVarStdKernel {}; - // Test numerical stability -TEST_F(TestVarStdKernelStability, Basics) { +template +class TestVarStdKernelStability : public TestPrimitiveVarStdKernel {}; + +typedef ::testing::Types + VarStdStabilityTypes; + +TYPED_TEST_SUITE(TestVarStdKernelStability, VarStdStabilityTypes); +TYPED_TEST(TestVarStdKernelStability, Basics) { VarianceOptions options{1}; // ddof = 1 this->AssertVarStdIs("[100000004, 100000007, 100000013, 100000016]", options, 30.0); this->AssertVarStdIs("[1000000004, 1000000007, 1000000013, 1000000016]", options, 30.0); + if (!is_unsigned_integer_type::value) { + this->AssertVarStdIs("[-1000000016, -1000000013, -1000000007, -1000000004]", options, + 30.0); + } +} + +// Test numerical stability of variance merging code +class TestVarStdKernelMergeStability : public TestPrimitiveVarStdKernel {}; + +TEST_F(TestVarStdKernelMergeStability, Basics) { + VarianceOptions options{1}; // ddof = 1 #ifndef __MINGW32__ // MinGW has precision issues - // This test is to make sure our variance combining method is stable. // XXX: The reference value from numpy is actually wrong due to floating // point limits. The correct result should equals variance(90, 0) = 4050. std::vector chunks = {"[40000008000000490]", "[40000008000000400]"}; @@ -1075,6 +1090,25 @@ TEST_F(TestVarStdKernelStability, Basics) { #endif } +// Test integer arithmetic code +class TestVarStdKernelInt32 : public TestPrimitiveVarStdKernel {}; + +TEST_F(TestVarStdKernelInt32, Basics) { + VarianceOptions options{1}; + this->AssertVarStdIs("[-2147483648, -2147483647, -2147483646]", options, 1.0); + this->AssertVarStdIs("[2147483645, 2147483646, 2147483647]", options, 1.0); + this->AssertVarStdIs("[-2147483648, -2147483648, 2147483647]", options, + 6.148914688373205e+18); +} + +class TestVarStdKernelUInt32 : public TestPrimitiveVarStdKernel {}; + +TEST_F(TestVarStdKernelUInt32, Basics) { + VarianceOptions options{1}; + this->AssertVarStdIs("[4294967293, 4294967294, 4294967295]", options, 1.0); + this->AssertVarStdIs("[0, 0, 4294967295]", options, 6.148914688373205e+18); +} + // https://en.wikipedia.org/wiki/Kahan_summation_algorithm void KahanSum(double& sum, double& adjust, double addend) { double y = addend - adjust; @@ -1085,18 +1119,19 @@ void KahanSum(double& sum, double& adjust, double addend) { // Calculate reference variance with Welford's online algorithm + Kahan summation // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm -std::pair WelfordVar(const Array& array) { - const auto& array_numeric = reinterpret_cast(array); - const auto values = array_numeric.raw_values(); +// XXX: not stable for long array with very small `stddev / average` +template +std::pair WelfordVar(const ArrayType& array) { + const auto values = array.raw_values(); internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), array.length()); double count = 0, mean = 0, m2 = 0; double mean_adjust = 0, m2_adjust = 0; for (int64_t i = 0; i < array.length(); ++i) { if (reader.IsSet()) { ++count; - double delta = values[i] - mean; + double delta = static_cast(values[i]) - mean; KahanSum(mean, mean_adjust, delta / count); - double delta2 = values[i] - mean; + double delta2 = static_cast(values[i]) - mean; KahanSum(m2, m2_adjust, delta * delta2); } reader.Next(); @@ -1104,16 +1139,31 @@ std::pair WelfordVar(const Array& array) { return std::make_pair(m2 / count, m2 / (count - 1)); } -class TestVarStdKernelRandom : public TestPrimitiveVarStdKernel {}; +// Test random chunked array +template +class TestVarStdKernelRandom : public TestPrimitiveVarStdKernel {}; + +typedef ::testing::Types + VarStdRandomTypes; -TEST_F(TestVarStdKernelRandom, Basics) { +TYPED_TEST_SUITE(TestVarStdKernelRandom, VarStdRandomTypes); +TYPED_TEST(TestVarStdKernelRandom, Basics) { // Cut array into small chunks constexpr int array_size = 5000; constexpr int chunk_size_max = 50; constexpr int chunk_count = array_size / chunk_size_max; + std::shared_ptr array; auto rand = random::RandomArrayGenerator(0x5487656); - auto array = rand.Numeric(array_size, -10000.0, 100000.0, 0.1); + if (is_floating_type::value) { + array = rand.Numeric(array_size, -10000.0, 100000.0, 0.1); + } else { + using CType = typename TypeParam::c_type; + constexpr CType min = std::numeric_limits::min(); + constexpr CType max = std::numeric_limits::max(); + array = rand.Numeric(array_size, min, max, 0.1); + } auto chunk_size_array = rand.Numeric(chunk_count, 0, chunk_size_max); const int* chunk_size = chunk_size_array->data()->GetValues(1); int total_size = 0; @@ -1126,11 +1176,35 @@ TEST_F(TestVarStdKernelRandom, Basics) { auto chunked = *ChunkedArray::Make(array_vector); double var_population, var_sample; - std::tie(var_population, var_sample) = WelfordVar(*(array->Slice(0, total_size))); + using ArrayType = typename TypeTraits::ArrayType; + auto typed_array = std::static_pointer_cast(array->Slice(0, total_size)); + std::tie(var_population, var_sample) = WelfordVar(*typed_array); this->AssertVarStdIs(chunked, VarianceOptions{0}, var_population); this->AssertVarStdIs(chunked, VarianceOptions{1}, var_sample); } +// This test is too heavy to run in CI, should be checked manually +#if 0 +class TestVarStdKernelIntegerLength : public TestPrimitiveVarStdKernel {}; + +TEST_F(TestVarStdKernelIntegerLength, Basics) { + constexpr int32_t min = std::numeric_limits::min(); + constexpr int32_t max = std::numeric_limits::max(); + auto rand = random::RandomArrayGenerator(0x5487657); + // large data volume + auto array = rand.Numeric(4000000000, min, max, 0.1); + // biased distribution + // auto array = rand.Numeric(4000000000, min, min + 100000, 0.1); + + double var_population, var_sample; + auto int32_array = std::static_pointer_cast(array); + std::tie(var_population, var_sample) = WelfordVar(*int32_array); + + this->AssertVarStdIs(*array, VarianceOptions{0}, var_population); + this->AssertVarStdIs(*array, VarianceOptions{1}, var_sample); +} +#endif + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc index 40980dbbcaa..4bffd5a754b 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc @@ -16,6 +16,7 @@ // under the License. #include "arrow/compute/kernels/aggregate_basic_internal.h" +#include "arrow/util/int128_internal.h" namespace arrow { namespace compute { @@ -23,30 +24,35 @@ namespace aggregate { namespace { +using arrow::internal::int128_t; + template struct VarStdState { using ArrayType = typename TypeTraits::ArrayType; - using c_type = typename ArrowType::c_type; + using CType = typename ArrowType::c_type; using ThisType = VarStdState; - // Calculate `m2` (sum((X-mean)^2)) of one chunk with `two pass algorithm` + // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm` // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Two-pass_algorithm - // Always use `double` to calculate variance for any array type - void Consume(const ArrayType& array) { + template + enable_if_t::value || (sizeof(CType) > 4)> Consume( + const ArrayType& array) { int64_t count = array.length() - array.null_count(); if (count == 0) { return; } - double sum = 0; + using SumType = + typename std::conditional::value, double, int128_t>::type; + SumType sum = 0; VisitArrayDataInline( - *array.data(), [&sum](c_type value) { sum += static_cast(value); }, + *array.data(), [&sum](CType value) { sum += static_cast(value); }, []() {}); - double mean = sum / count, m2 = 0; + double mean = static_cast(sum) / count, m2 = 0; VisitArrayDataInline( *array.data(), - [mean, &m2](c_type value) { + [mean, &m2](CType value) { double v = static_cast(value); m2 += (v - mean) * (v - mean); }, @@ -57,6 +63,54 @@ struct VarStdState { this->m2 = m2; } + // int32/16/8: textbook one pass algorithm with integer arithmetic + template + enable_if_t::value && (sizeof(CType) <= 4)> Consume( + const ArrayType& array) { + // max number of elements that sum will not overflow int64 (2Gi int32 elements) + // for uint32: 0 <= sum < 2^63 (int64 >= 0) + // for int32: -2^62 <= sum < 2^62 + constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8); + + int64_t start_index = 0; + int64_t valid_count = array.length() - array.null_count(); + + while (valid_count > 0) { + // process in chunks that overflow will never happen + const auto slice = array.Slice(start_index, max_length); + const int64_t count = slice->length() - slice->null_count(); + start_index += max_length; + valid_count -= count; + + if (count > 0) { + int64_t sum = 0; + int128_t square_sum = 0; + VisitArrayDataInline( + *slice->data(), + [&sum, &square_sum](CType value) { + sum += value; + square_sum += static_cast(value) * value; + }, + []() {}); + + const double mean = static_cast(sum) / count; + // calculate m2 = square_sum - sum * sum / count + // decompose `sum * sum / count` into integers and fractions + const int128_t sum_square = static_cast(sum) * sum; + const int128_t integers = sum_square / count; + const double fractions = static_cast(sum_square % count) / count; + const double m2 = static_cast(square_sum - integers) - fractions; + + // merge variance + ThisType state; + state.count = count; + state.mean = mean; + state.m2 = m2; + this->MergeFrom(state); + } + } + } + // Combine `m2` from two chunks (m2 = n*s2) // https://www.emathzone.com/tutorials/basic-statistics/combined-variance.html void MergeFrom(const ThisType& state) {