diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index f40fa3798b4..f3aae96a80d 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -365,6 +365,7 @@ if(ARROW_COMPUTE) compute/registry.cc compute/kernels/aggregate_basic.cc compute/kernels/aggregate_mode.cc + compute/kernels/aggregate_var_std.cc compute/kernels/codegen_internal.cc compute/kernels/scalar_arithmetic.cc compute/kernels/scalar_boolean.cc diff --git a/cpp/src/arrow/compute/api_aggregate.cc b/cpp/src/arrow/compute/api_aggregate.cc index 2802b02105d..53ee5b9a2b2 100644 --- a/cpp/src/arrow/compute/api_aggregate.cc +++ b/cpp/src/arrow/compute/api_aggregate.cc @@ -45,5 +45,15 @@ Result Mode(const Datum& value, ExecContext* ctx) { return CallFunction("mode", {value}, ctx); } +Result Stddev(const Datum& value, const VarianceOptions& options, + ExecContext* ctx) { + return CallFunction("stddev", {value}, &options, ctx); +} + +Result Variance(const Datum& value, const VarianceOptions& options, + ExecContext* ctx) { + return CallFunction("variance", {value}, &options, ctx); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h index 9528ce3477b..41c6e1d613a 100644 --- a/cpp/src/arrow/compute/api_aggregate.h +++ b/cpp/src/arrow/compute/api_aggregate.h @@ -76,6 +76,18 @@ struct ARROW_EXPORT MinMaxOptions : public FunctionOptions { enum Mode null_handling = SKIP; }; +/// \brief Control Delta Degrees of Freedom (ddof) of Variance and Stddev kernel +/// +/// The divisor used in calculations is N - ddof, where N is the number of elements. +/// By default, ddof is zero, and population variance or stddev is returned. +struct ARROW_EXPORT VarianceOptions : public FunctionOptions { + explicit VarianceOptions(int ddof = 0) : ddof(ddof) {} + + static VarianceOptions Defaults() { return VarianceOptions{}; } + + int ddof = 0; +}; + /// @} /// \brief Count non-null (or null) values in an array. @@ -145,5 +157,33 @@ Result MinMax(const Datum& value, ARROW_EXPORT Result Mode(const Datum& value, ExecContext* ctx = NULLPTR); +/// \brief Calculate the standard deviation of a numeric array +/// +/// \param[in] value input datum, expecting Array or ChunkedArray +/// \param[in] options see VarianceOptions for more information +/// \param[in] ctx the function execution context, optional +/// \return datum of the computed standard deviation as a DoubleScalar +/// +/// \since 2.0.0 +/// \note API not yet finalized +ARROW_EXPORT +Result Stddev(const Datum& value, + const VarianceOptions& options = VarianceOptions::Defaults(), + ExecContext* ctx = NULLPTR); + +/// \brief Calculate the variance of a numeric array +/// +/// \param[in] value input datum, expecting Array or ChunkedArray +/// \param[in] options see VarianceOptions for more information +/// \param[in] ctx the function execution context, optional +/// \return datum of the computed variance as a DoubleScalar +/// +/// \since 2.0.0 +/// \note API not yet finalized +ARROW_EXPORT +Result Variance(const Datum& value, + const VarianceOptions& options = VarianceOptions::Defaults(), + ExecContext* ctx = NULLPTR); + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc index 94914d00f10..562e17485fd 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc @@ -241,6 +241,8 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) { DCHECK_OK(registry->AddFunction(std::move(func))); DCHECK_OK(registry->AddFunction(aggregate::AddModeAggKernels())); + DCHECK_OK(registry->AddFunction(aggregate::AddStddevAggKernels())); + DCHECK_OK(registry->AddFunction(aggregate::AddVarianceAggKernels())); } } // namespace internal diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h index cd8390e1adf..2b0631ee2f2 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h @@ -59,6 +59,8 @@ void AddMeanAvx512AggKernels(ScalarAggregateFunction* func); void AddMinMaxAvx512AggKernels(ScalarAggregateFunction* func); std::shared_ptr AddModeAggKernels(); +std::shared_ptr AddStddevAggKernels(); +std::shared_ptr AddVarianceAggKernels(); // ---------------------------------------------------------------------- // Sum implementation diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index ee02d197ea2..39b3f8827fb 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -942,5 +942,183 @@ TEST_F(TestInt32ModeKernel, LargeValueRange) { CheckModeWithRange(-10000000, 10000000); } +// +// Variance/Stddev +// + +template +class TestPrimitiveVarStdKernel : public ::testing::Test { + public: + using Traits = TypeTraits; + using ScalarType = typename TypeTraits::ScalarType; + + void AssertVarStdIs(const Array& array, const VarianceOptions& options, + double expected_var, double diff = 0) { + AssertVarStdIsInternal(array, options, expected_var, diff); + } + + void AssertVarStdIs(const std::shared_ptr& array, + const VarianceOptions& options, double expected_var, + double diff = 0) { + AssertVarStdIsInternal(array, options, expected_var, diff); + } + + void AssertVarStdIs(const std::string& json, const VarianceOptions& options, + double expected_var) { + auto array = ArrayFromJSON(type_singleton(), json); + AssertVarStdIs(*array, options, expected_var); + } + + void AssertVarStdIs(const std::vector& json, + const VarianceOptions& options, double expected_var) { + auto chunked = ChunkedArrayFromJSON(type_singleton(), json); + AssertVarStdIs(chunked, options, expected_var); + } + + void AssertVarStdIsInvalid(const Array& array, const VarianceOptions& options) { + AssertVarStdIsInvalidInternal(array, options); + } + + void AssertVarStdIsInvalid(const std::shared_ptr& array, + const VarianceOptions& options) { + AssertVarStdIsInvalidInternal(array, options); + } + + void AssertVarStdIsInvalid(const std::string& json, const VarianceOptions& options) { + auto array = ArrayFromJSON(type_singleton(), json); + AssertVarStdIsInvalid(*array, options); + } + + void AssertVarStdIsInvalid(const std::vector& json, + const VarianceOptions& options) { + auto array = ChunkedArrayFromJSON(type_singleton(), json); + AssertVarStdIsInvalid(array, options); + } + + std::shared_ptr type_singleton() { return Traits::type_singleton(); } + + private: + void AssertVarStdIsInternal(const Datum& array, const VarianceOptions& options, + double expected_var, double diff = 0) { + ASSERT_OK_AND_ASSIGN(Datum out_var, Variance(array, options)); + ASSERT_OK_AND_ASSIGN(Datum out_std, Stddev(array, options)); + auto var = checked_cast(out_var.scalar().get()); + auto std = checked_cast(out_std.scalar().get()); + ASSERT_TRUE(var->is_valid && std->is_valid); + ASSERT_DOUBLE_EQ(std->value * std->value, var->value); + if (diff == 0) { + ASSERT_DOUBLE_EQ(var->value, expected_var); // < 4ULP + } else { + ASSERT_NEAR(var->value, expected_var, diff); + } + } + + void AssertVarStdIsInvalidInternal(const Datum& array, const VarianceOptions& options) { + ASSERT_OK_AND_ASSIGN(Datum out_var, Variance(array, options)); + ASSERT_OK_AND_ASSIGN(Datum out_std, Stddev(array, options)); + auto var = checked_cast(out_var.scalar().get()); + auto std = checked_cast(out_std.scalar().get()); + ASSERT_FALSE(var->is_valid || std->is_valid); + } +}; + +template +class TestNumericVarStdKernel : public TestPrimitiveVarStdKernel {}; + +// Reference value from numpy.var +TYPED_TEST_SUITE(TestNumericVarStdKernel, NumericArrowTypes); +TYPED_TEST(TestNumericVarStdKernel, Basics) { + VarianceOptions options; // ddof = 0, population variance/stddev + + this->AssertVarStdIs("[100]", options, 0); + this->AssertVarStdIs("[1, 2, 3]", options, 0.6666666666666666); + this->AssertVarStdIs("[null, 1, 2, null, 3]", options, 0.6666666666666666); + + std::vector chunks; + chunks = {"[]", "[1]", "[2]", "[null]", "[3]"}; + this->AssertVarStdIs(chunks, options, 0.6666666666666666); + chunks = {"[1, 2, 3]", "[4, 5, 6]", "[7, 8]"}; + this->AssertVarStdIs(chunks, options, 5.25); + chunks = {"[1, 2, 3, 4, 5, 6, 7]", "[8]"}; + this->AssertVarStdIs(chunks, options, 5.25); + + this->AssertVarStdIsInvalid("[null, null, null]", options); + this->AssertVarStdIsInvalid("[]", options); + this->AssertVarStdIsInvalid("[]", options); + + options.ddof = 1; // sample variance/stddev + + this->AssertVarStdIs("[1, 2]", options, 0.5); + + chunks = {"[1]", "[2]"}; + this->AssertVarStdIs(chunks, options, 0.5); + chunks = {"[1, 2, 3]", "[4, 5, 6]", "[7, 8]"}; + this->AssertVarStdIs(chunks, options, 6.0); + chunks = {"[1, 2, 3, 4, 5, 6, 7]", "[8]"}; + this->AssertVarStdIs(chunks, options, 6.0); + + this->AssertVarStdIsInvalid("[100]", options); + this->AssertVarStdIsInvalid("[100, null, null]", options); + chunks = {"[100]", "[null]", "[]"}; + this->AssertVarStdIsInvalid(chunks, options); +} + +class TestVarStdKernelStability : public TestPrimitiveVarStdKernel {}; + +// Test numerical stability +TEST_F(TestVarStdKernelStability, Basics) { + VarianceOptions options{1}; // ddof = 1 + this->AssertVarStdIs("[100000004, 100000007, 100000013, 100000016]", options, 30.0); + this->AssertVarStdIs("[1000000004, 1000000007, 1000000013, 1000000016]", options, 30.0); +} + +// Calculate reference variance with Welford's online algorithm +// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm +std::pair WelfordVar(const Array& array) { + const auto& array_numeric = reinterpret_cast(array); + const auto values = array_numeric.raw_values(); + internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), array.length()); + double count = 0, mean = 0, m2 = 0; + for (int64_t i = 0; i < array.length(); ++i) { + if (reader.IsSet()) { + ++count; + double delta = values[i] - mean; + mean += delta / count; + double delta2 = values[i] - mean; + m2 += delta * delta2; + } + reader.Next(); + } + return std::make_pair(m2 / count, m2 / (count - 1)); +} + +class TestVarStdKernelRandom : public TestPrimitiveVarStdKernel {}; + +TEST_F(TestVarStdKernelRandom, Basics) { + // Cut array into small chunks + constexpr int array_size = 5000; + constexpr int chunk_size_max = 50; + constexpr int chunk_count = array_size / chunk_size_max; + + auto rand = random::RandomArrayGenerator(0x5487656); + auto array = rand.Numeric(array_size, -10000.0, 100000.0, 0.1); + auto chunk_size_array = rand.Numeric(chunk_count, 0, chunk_size_max); + const int* chunk_size = chunk_size_array->data()->GetValues(1); + int total_size = 0; + + ArrayVector array_vector; + for (int i = 0; i < chunk_count; ++i) { + array_vector.emplace_back(array->Slice(total_size, chunk_size[i])); + total_size += chunk_size[i]; + } + auto chunked = *ChunkedArray::Make(array_vector); + + double var_population, var_sample; + std::tie(var_population, var_sample) = WelfordVar(*(array->Slice(0, total_size))); + + this->AssertVarStdIs(chunked, VarianceOptions{0}, var_population, 0.0001); + this->AssertVarStdIs(chunked, VarianceOptions{1}, var_sample, 0.0001); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc new file mode 100644 index 00000000000..e2b98bb38fc --- /dev/null +++ b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/kernels/aggregate_basic_internal.h" + +namespace arrow { +namespace compute { +namespace aggregate { + +namespace { + +template +struct VarStdState { + using ArrayType = typename TypeTraits::ArrayType; + using c_type = typename ArrowType::c_type; + using ThisType = VarStdState; + + // Calculate `m2` (sum((X-mean)^2)) of one chunk with `two pass algorithm` + // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Two-pass_algorithm + // Always use `double` to calculate variance for any array type + void Consume(const ArrayType& array) { + int64_t count = array.length() - array.null_count(); + if (count == 0) { + return; + } + + double sum = 0; + VisitArrayDataInline( + *array.data(), [&sum](c_type value) { sum += static_cast(value); }, + []() {}); + + double mean = sum / count, m2 = 0; + VisitArrayDataInline( + *array.data(), + [mean, &m2](c_type value) { + double v = static_cast(value); + m2 += (v - mean) * (v - mean); + }, + []() {}); + + this->count = count; + this->sum = sum; + this->m2 = m2; + } + + // Combine `m2` from two chunks + // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm + void MergeFrom(const ThisType& state) { + if (state.count == 0) { + return; + } + if (this->count == 0) { + this->count = state.count; + this->sum = state.sum; + this->m2 = state.m2; + return; + } + double delta = this->sum / this->count - state.sum / state.count; + this->m2 += state.m2 + + delta * delta * this->count * state.count / (this->count + state.count); + this->count += state.count; + this->sum += state.sum; + } + + int64_t count = 0; + double sum = 0; + double m2 = 0; // sum((X-mean)^2) +}; + +enum class VarOrStd : bool { Var, Std }; + +template +struct VarStdImpl : public ScalarAggregator { + using ThisType = VarStdImpl; + using ArrayType = typename TypeTraits::ArrayType; + + explicit VarStdImpl(const std::shared_ptr& out_type, + const VarianceOptions& options, VarOrStd return_type) + : out_type(out_type), options(options), return_type(return_type) {} + + void Consume(KernelContext*, const ExecBatch& batch) override { + ArrayType array(batch[0].array()); + this->state.Consume(array); + } + + void MergeFrom(KernelContext*, KernelState&& src) override { + const auto& other = checked_cast(src); + this->state.MergeFrom(other.state); + } + + void Finalize(KernelContext*, Datum* out) override { + if (this->state.count <= options.ddof) { + out->value = std::make_shared(); + } else { + double var = this->state.m2 / (this->state.count - options.ddof); + out->value = + std::make_shared(return_type == VarOrStd::Var ? var : sqrt(var)); + } + } + + std::shared_ptr out_type; + VarStdState state; + VarianceOptions options; + VarOrStd return_type; +}; + +struct VarStdInitState { + std::unique_ptr state; + KernelContext* ctx; + const DataType& in_type; + const std::shared_ptr& out_type; + const VarianceOptions& options; + VarOrStd return_type; + + VarStdInitState(KernelContext* ctx, const DataType& in_type, + const std::shared_ptr& out_type, + const VarianceOptions& options, VarOrStd return_type) + : ctx(ctx), + in_type(in_type), + out_type(out_type), + options(options), + return_type(return_type) {} + + Status Visit(const DataType&) { + return Status::NotImplemented("No variance/stddev implemented"); + } + + Status Visit(const HalfFloatType&) { + return Status::NotImplemented("No variance/stddev implemented"); + } + + template + enable_if_t::value, Status> Visit(const Type&) { + state.reset(new VarStdImpl(out_type, options, return_type)); + return Status::OK(); + } + + std::unique_ptr Create() { + ctx->SetStatus(VisitTypeInline(in_type, this)); + return std::move(state); + } +}; + +std::unique_ptr StddevInit(KernelContext* ctx, const KernelInitArgs& args) { + VarStdInitState visitor( + ctx, *args.inputs[0].type, args.kernel->signature->out_type().type(), + static_cast(*args.options), VarOrStd::Std); + return visitor.Create(); +} + +std::unique_ptr VarianceInit(KernelContext* ctx, + const KernelInitArgs& args) { + VarStdInitState visitor( + ctx, *args.inputs[0].type, args.kernel->signature->out_type().type(), + static_cast(*args.options), VarOrStd::Var); + return visitor.Create(); +} + +void AddVarStdKernels(KernelInit init, + const std::vector>& types, + ScalarAggregateFunction* func) { + for (const auto& ty : types) { + auto sig = KernelSignature::Make({InputType::Array(ty)}, float64()); + AddAggKernel(std::move(sig), init, func); + } +} + +} // namespace + +std::shared_ptr AddStddevAggKernels() { + static auto default_std_options = VarianceOptions::Defaults(); + auto func = std::make_shared("stddev", Arity::Unary(), + &default_std_options); + AddVarStdKernels(StddevInit, internal::NumericTypes(), func.get()); + return func; +} + +std::shared_ptr AddVarianceAggKernels() { + static auto default_var_options = VarianceOptions::Defaults(); + auto func = std::make_shared("variance", Arity::Unary(), + &default_var_options); + AddVarStdKernels(VarianceInit, internal::NumericTypes(), func.get()); + return func; +} + +} // namespace aggregate +} // namespace compute +} // namespace arrow diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst index 3a0abe39a38..6ef10abf67d 100644 --- a/docs/source/cpp/compute.rst +++ b/docs/source/cpp/compute.rst @@ -140,8 +140,12 @@ Aggregations +--------------------------+------------+--------------------+-----------------------+--------------------------------------------+ | mode | Unary | Numeric | Scalar Struct (2) | | +--------------------------+------------+--------------------+-----------------------+--------------------------------------------+ +| stddev | Unary | Numeric | Scalar Float64 | :struct:`VarianceOptions` | ++--------------------------+------------+--------------------+-----------------------+--------------------------------------------+ | sum | Unary | Numeric | Scalar Numeric (3) | | +--------------------------+------------+--------------------+-----------------------+--------------------------------------------+ +| variance | Unary | Numeric | Scalar Float64 | :struct:`VarianceOptions` | ++--------------------------+------------+--------------------+-----------------------+--------------------------------------------+ Notes: