diff --git a/cpp/src/arrow/compute/kernels/aggregate_internal.h b/cpp/src/arrow/compute/kernels/aggregate_internal.h index ed29f26f2c3..3f5ba39d30e 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_internal.h @@ -55,6 +55,10 @@ struct ScalarAggregator : public KernelState { virtual Status Finalize(KernelContext* ctx, Datum* out) = 0; }; +// Helper to differentiate between var/std calculation so we can fold +// kernel implementations together +enum class VarOrStd : bool { Var, Std }; + void AddAggKernel(std::shared_ptr sig, KernelInit init, ScalarAggregateFunction* func, SimdLevel::type simd_level = SimdLevel::NONE); diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc index d6965fed4a3..6fa49d03d76 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc @@ -19,6 +19,7 @@ #include "arrow/compute/api_aggregate.h" #include "arrow/compute/kernels/aggregate_internal.h" +#include "arrow/compute/kernels/aggregate_var_std_internal.h" #include "arrow/compute/kernels/common.h" #include "arrow/util/bit_run_reader.h" #include "arrow/util/int128_internal.h" @@ -85,32 +86,22 @@ struct VarStdState { valid_count -= count; if (count > 0) { - int64_t sum = 0; - int128_t square_sum = 0; + IntegerVarStd var_std; const ArrayData& data = *slice->data(); const CType* values = data.GetValues(1); VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length, [&](int64_t pos, int64_t len) { for (int64_t i = 0; i < len; ++i) { const auto value = values[pos + i]; - sum += value; - square_sum += static_cast(value) * value; + var_std.ConsumeOne(value); } }); - const double mean = static_cast(sum) / count; - // calculate m2 = square_sum - sum * sum / count - // decompose `sum * sum / count` into integers and fractions - const int128_t sum_square = static_cast(sum) * sum; - const int128_t integers = sum_square / count; - const double fractions = static_cast(sum_square % count) / count; - const double m2 = static_cast(square_sum - integers) - fractions; - // merge variance ThisType state; - state.count = count; - state.mean = mean; - state.m2 = m2; + state.count = var_std.count; + state.mean = var_std.mean(); + state.m2 = var_std.m2(); this->MergeFrom(state); } } @@ -128,12 +119,8 @@ struct VarStdState { this->m2 = state.m2; return; } - double mean = (this->mean * this->count + state.mean * state.count) / - (this->count + state.count); - this->m2 += state.m2 + this->count * (this->mean - mean) * (this->mean - mean) + - state.count * (state.mean - mean) * (state.mean - mean); - this->count += state.count; - this->mean = mean; + MergeVarStd(this->count, this->mean, state.count, state.mean, state.m2, &this->count, + &this->mean, &this->m2); } int64_t count = 0; @@ -141,8 +128,6 @@ struct VarStdState { double m2 = 0; // m2 = count*s2 = sum((X-mean)^2) }; -enum class VarOrStd : bool { Var, Std }; - template struct VarStdImpl : public ScalarAggregator { using ThisType = VarStdImpl; diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h b/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h new file mode 100644 index 00000000000..675ebfd91d3 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/util/int128_internal.h" + +namespace arrow { +namespace compute { +namespace internal { + +using arrow::internal::int128_t; + +// Accumulate sum/squared sum (using naive summation) +// Shared implementation between scalar/hash aggregate variance/stddev kernels +template +struct IntegerVarStd { + using c_type = typename ArrowType::c_type; + + int64_t count = 0; + int64_t sum = 0; + int128_t square_sum = 0; + + void ConsumeOne(const c_type value) { + sum += value; + square_sum += static_cast(value) * value; + count++; + } + + double mean() const { return static_cast(sum) / count; } + + double m2() const { + // calculate m2 = square_sum - sum * sum / count + // decompose `sum * sum / count` into integers and fractions + const int128_t sum_square = static_cast(sum) * sum; + const int128_t integers = sum_square / count; + const double fractions = static_cast(sum_square % count) / count; + return static_cast(square_sum - integers) - fractions; + } +}; + +static inline void MergeVarStd(int64_t count1, double mean1, int64_t count2, double mean2, + double m22, int64_t* out_count, double* out_mean, + double* out_m2) { + double mean = (mean1 * count1 + mean2 * count2) / (count1 + count2); + *out_m2 += m22 + count1 * (mean1 - mean) * (mean1 - mean) + + count2 * (mean2 - mean) * (mean2 - mean); + *out_count += count2; + *out_mean = mean; +} + +} // namespace internal +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 3e4b401bae9..472ae956388 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include #include #include #include @@ -34,12 +35,14 @@ #include "arrow/compute/exec_internal.h" #include "arrow/compute/kernel.h" #include "arrow/compute/kernels/aggregate_internal.h" +#include "arrow/compute/kernels/aggregate_var_std_internal.h" #include "arrow/compute/kernels/common.h" #include "arrow/util/bit_run_reader.h" #include "arrow/util/bitmap_ops.h" #include "arrow/util/bitmap_writer.h" #include "arrow/util/checked_cast.h" #include "arrow/util/cpu_info.h" +#include "arrow/util/int128_internal.h" #include "arrow/util/make_unique.h" #include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" @@ -917,6 +920,8 @@ struct GroupedSumImpl : public GroupedAggregator { auto sums = reinterpret_cast(sums_.mutable_data()); auto counts = reinterpret_cast(counts_.mutable_data()); + // XXX this uses naive summation; we should switch to pairwise summation as was + // done for the scalar aggregate kernel in ARROW-11758 auto g = batch[1].array()->GetValues(1); VisitArrayDataInline( *batch[0].array(), @@ -1005,6 +1010,307 @@ struct GroupedSumFactory { InputType argument_type; }; +// ---------------------------------------------------------------------- +// Mean implementation + +template +struct GroupedMeanImpl : public GroupedSumImpl { + Result Finalize() override { + using SumType = typename GroupedSumImpl::SumType; + std::shared_ptr null_bitmap; + ARROW_ASSIGN_OR_RAISE(std::shared_ptr values, + AllocateBuffer(num_groups_ * sizeof(double), pool_)); + int64_t null_count = 0; + + const int64_t* counts = reinterpret_cast(counts_.data()); + const auto* sums = reinterpret_cast(sums_.data()); + double* means = reinterpret_cast(values->mutable_data()); + for (int64_t i = 0; i < num_groups_; ++i) { + if (counts[i] > 0) { + means[i] = static_cast(sums[i]) / counts[i]; + continue; + } + means[i] = 0; + + if (null_bitmap == nullptr) { + ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_)); + BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true); + } + + null_count += 1; + BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false); + } + + return ArrayData::Make(float64(), num_groups_, + {std::move(null_bitmap), std::move(values)}, null_count); + } + + std::shared_ptr out_type() const override { return float64(); } + + using GroupedSumImpl::num_groups_; + using GroupedSumImpl::pool_; + using GroupedSumImpl::counts_; + using GroupedSumImpl::sums_; +}; + +struct GroupedMeanFactory { + template ::Type> + Status Visit(const T&) { + kernel = MakeKernel(std::move(argument_type), HashAggregateInit>); + return Status::OK(); + } + + Status Visit(const HalfFloatType& type) { + return Status::NotImplemented("Computing mean of type ", type); + } + + Status Visit(const DataType& type) { + return Status::NotImplemented("Computing mean of type ", type); + } + + static Result Make(const std::shared_ptr& type) { + GroupedMeanFactory factory; + factory.argument_type = InputType::Array(type); + RETURN_NOT_OK(VisitTypeInline(*type, &factory)); + return std::move(factory.kernel); + } + + HashAggregateKernel kernel; + InputType argument_type; +}; + +// Variance/Stdev implementation + +using arrow::internal::int128_t; + +template +struct GroupedVarStdImpl : public GroupedAggregator { + using CType = typename Type::c_type; + + Status Init(ExecContext* ctx, const FunctionOptions* options) override { + options_ = *checked_cast(options); + ctx_ = ctx; + pool_ = ctx->memory_pool(); + counts_ = BufferBuilder(pool_); + means_ = BufferBuilder(pool_); + m2s_ = BufferBuilder(pool_); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + auto added_groups = new_num_groups - num_groups_; + num_groups_ = new_num_groups; + RETURN_NOT_OK(counts_.Append(added_groups * sizeof(int64_t), 0)); + RETURN_NOT_OK(means_.Append(added_groups * sizeof(double), 0)); + RETURN_NOT_OK(m2s_.Append(added_groups * sizeof(double), 0)); + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { return ConsumeImpl(batch); } + + // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm` + // (see aggregate_var_std.cc) + template + enable_if_t::value || (sizeof(CType) > 4), Status> ConsumeImpl( + const ExecBatch& batch) { + using SumType = + typename std::conditional::value, double, int128_t>::type; + + int64_t* counts = reinterpret_cast(counts_.mutable_data()); + double* means = reinterpret_cast(means_.mutable_data()); + double* m2s = reinterpret_cast(m2s_.mutable_data()); + + // XXX this uses naive summation; we should switch to pairwise summation as was + // done for the scalar aggregate kernel in ARROW-11567 + std::vector sums(num_groups_); + auto g = batch[1].array()->GetValues(1); + VisitArrayDataInline( + *batch[0].array(), + [&](typename TypeTraits::CType value) { + sums[*g] += value; + counts[*g] += 1; + ++g; + }, + [&] { ++g; }); + + for (int64_t i = 0; i < num_groups_; i++) { + means[i] = static_cast(sums[i]) / counts[i]; + } + + g = batch[1].array()->GetValues(1); + VisitArrayDataInline( + *batch[0].array(), + [&](typename TypeTraits::CType value) { + const double v = static_cast(value); + m2s[*g] += (v - means[*g]) * (v - means[*g]); + ++g; + }, + [&] { ++g; }); + + return Status::OK(); + } + + // int32/16/8: textbook one pass algorithm with integer arithmetic (see + // aggregate_var_std.cc) + template + enable_if_t::value && (sizeof(CType) <= 4), Status> ConsumeImpl( + const ExecBatch& batch) { + // max number of elements that sum will not overflow int64 (2Gi int32 elements) + // for uint32: 0 <= sum < 2^63 (int64 >= 0) + // for int32: -2^62 <= sum < 2^62 + constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8); + + const auto& array = *batch[0].array(); + const auto g = batch[1].array()->GetValues(1); + + std::vector> var_std(num_groups_); + + ARROW_ASSIGN_OR_RAISE(auto mapping, + AllocateBuffer(num_groups_ * sizeof(uint32_t), pool_)); + for (uint32_t i = 0; static_cast(i) < num_groups_; i++) { + reinterpret_cast(mapping->mutable_data())[i] = i; + } + ArrayData group_id_mapping(uint32(), num_groups_, {nullptr, std::move(mapping)}, + /*null_count=*/0); + + const CType* values = array.GetValues(1); + + for (int64_t start_index = 0; start_index < batch.length; start_index += max_length) { + // process in chunks that overflow will never happen + + // reset state + var_std.clear(); + var_std.resize(num_groups_); + GroupedVarStdImpl state; + RETURN_NOT_OK(state.Init(ctx_, &options_)); + RETURN_NOT_OK(state.Resize(num_groups_)); + int64_t* other_counts = reinterpret_cast(state.counts_.mutable_data()); + double* other_means = reinterpret_cast(state.means_.mutable_data()); + double* other_m2s = reinterpret_cast(state.m2s_.mutable_data()); + + arrow::internal::VisitSetBitRunsVoid( + array.buffers[0], array.offset + start_index, + std::min(max_length, batch.length - start_index), + [&](int64_t pos, int64_t len) { + for (int64_t i = 0; i < len; ++i) { + const int64_t index = start_index + pos + i; + const auto value = values[index]; + var_std[g[index]].ConsumeOne(value); + } + }); + + for (int64_t i = 0; i < num_groups_; i++) { + if (var_std[i].count == 0) continue; + + other_counts[i] = var_std[i].count; + other_means[i] = var_std[i].mean(); + other_m2s[i] = var_std[i].m2(); + } + RETURN_NOT_OK(this->Merge(std::move(state), group_id_mapping)); + } + return Status::OK(); + } + + Status Merge(GroupedAggregator&& raw_other, + const ArrayData& group_id_mapping) override { + // Combine m2 from two chunks (see aggregate_var_std.cc) + auto other = checked_cast(&raw_other); + + auto counts = reinterpret_cast(counts_.mutable_data()); + auto means = reinterpret_cast(means_.mutable_data()); + auto m2s = reinterpret_cast(m2s_.mutable_data()); + + const auto* other_counts = reinterpret_cast(other->counts_.data()); + const auto* other_means = reinterpret_cast(other->means_.data()); + const auto* other_m2s = reinterpret_cast(other->m2s_.data()); + + auto g = group_id_mapping.GetValues(1); + for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) { + if (other_counts[other_g] == 0) continue; + MergeVarStd(counts[*g], means[*g], other_counts[other_g], other_means[other_g], + other_m2s[other_g], &counts[*g], &means[*g], &m2s[*g]); + } + return Status::OK(); + } + + Result Finalize() override { + std::shared_ptr null_bitmap; + ARROW_ASSIGN_OR_RAISE(std::shared_ptr values, + AllocateBuffer(num_groups_ * sizeof(double), pool_)); + int64_t null_count = 0; + + double* results = reinterpret_cast(values->mutable_data()); + const int64_t* counts = reinterpret_cast(counts_.data()); + const double* m2s = reinterpret_cast(m2s_.data()); + for (int64_t i = 0; i < num_groups_; ++i) { + if (counts[i] > options_.ddof) { + const double variance = m2s[i] / (counts[i] - options_.ddof); + results[i] = result_type_ == VarOrStd::Var ? variance : std::sqrt(variance); + continue; + } + + results[i] = 0; + if (null_bitmap == nullptr) { + ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_)); + BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true); + } + + null_count += 1; + BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false); + } + + return ArrayData::Make(float64(), num_groups_, + {std::move(null_bitmap), std::move(values)}, null_count); + } + + std::shared_ptr out_type() const override { return float64(); } + + VarOrStd result_type_; + VarianceOptions options_; + int64_t num_groups_ = 0; + // m2 = count * s2 = sum((X-mean)^2) + BufferBuilder counts_, means_, m2s_; + ExecContext* ctx_; + MemoryPool* pool_; +}; + +template +Result> VarStdInit(KernelContext* ctx, + const KernelInitArgs& args) { + auto impl = ::arrow::internal::make_unique>(); + impl->result_type_ = result_type; + RETURN_NOT_OK(impl->Init(ctx->exec_context(), args.options)); + return std::move(impl); +} + +template +struct GroupedVarStdFactory { + template ::value || + is_floating_type::value>> + Status Visit(const T&) { + kernel = MakeKernel(std::move(argument_type), VarStdInit); + return Status::OK(); + } + + Status Visit(const HalfFloatType& type) { + return Status::NotImplemented("Computing variance/stddev of data of type ", type); + } + + Status Visit(const DataType& type) { + return Status::NotImplemented("Computing variance/stddev of data of type ", type); + } + + static Result Make(const std::shared_ptr& type) { + GroupedVarStdFactory factory; + factory.argument_type = InputType::Array(type); + RETURN_NOT_OK(VisitTypeInline(*type, &factory)); + return std::move(factory.kernel); + } + + HashAggregateKernel kernel; + InputType argument_type; +}; + // ---------------------------------------------------------------------- // MinMax implementation @@ -1537,6 +1843,26 @@ const FunctionDoc hash_sum_doc{"Sum values of a numeric array", ("Null values are ignored."), {"array", "group_id_array"}}; +const FunctionDoc hash_mean_doc{"Average values of a numeric array", + ("Null values are ignored."), + {"array", "group_id_array"}}; + +const FunctionDoc hash_stddev_doc{ + "Calculate the standard deviation of a numeric array", + ("The number of degrees of freedom can be controlled using VarianceOptions.\n" + "By default (`ddof` = 0), the population standard deviation is calculated.\n" + "Nulls are ignored. If there are not enough non-null values in the array\n" + "to satisfy `ddof`, null is returned."), + {"array", "group_id_array"}}; + +const FunctionDoc hash_variance_doc{ + "Calculate the variance of a numeric array", + ("The number of degrees of freedom can be controlled using VarianceOptions.\n" + "By default (`ddof` = 0), the population variance is calculated.\n" + "Nulls are ignored. If there are not enough non-null values in the array\n" + "to satisfy `ddof`, null is returned."), + {"array", "group_id_array"}}; + const FunctionDoc hash_min_max_doc{ "Compute the minimum and maximum values of a numeric array", ("Null values are ignored by default.\n" @@ -1576,6 +1902,43 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) { DCHECK_OK(registry->AddFunction(std::move(func))); } + { + auto func = std::make_shared("hash_mean", Arity::Binary(), + &hash_mean_doc); + DCHECK_OK(AddHashAggKernels({boolean()}, GroupedMeanFactory::Make, func.get())); + DCHECK_OK(AddHashAggKernels(SignedIntTypes(), GroupedMeanFactory::Make, func.get())); + DCHECK_OK( + AddHashAggKernels(UnsignedIntTypes(), GroupedMeanFactory::Make, func.get())); + DCHECK_OK( + AddHashAggKernels(FloatingPointTypes(), GroupedMeanFactory::Make, func.get())); + DCHECK_OK(registry->AddFunction(std::move(func))); + } + + static auto default_variance_options = VarianceOptions::Defaults(); + { + auto func = std::make_shared( + "hash_stddev", Arity::Binary(), &hash_stddev_doc, &default_variance_options); + DCHECK_OK(AddHashAggKernels(SignedIntTypes(), + GroupedVarStdFactory::Make, func.get())); + DCHECK_OK(AddHashAggKernels(UnsignedIntTypes(), + GroupedVarStdFactory::Make, func.get())); + DCHECK_OK(AddHashAggKernels(FloatingPointTypes(), + GroupedVarStdFactory::Make, func.get())); + DCHECK_OK(registry->AddFunction(std::move(func))); + } + + { + auto func = std::make_shared( + "hash_variance", Arity::Binary(), &hash_variance_doc, &default_variance_options); + DCHECK_OK(AddHashAggKernels(SignedIntTypes(), + GroupedVarStdFactory::Make, func.get())); + DCHECK_OK(AddHashAggKernels(UnsignedIntTypes(), + GroupedVarStdFactory::Make, func.get())); + DCHECK_OK(AddHashAggKernels(FloatingPointTypes(), + GroupedVarStdFactory::Make, func.get())); + DCHECK_OK(registry->AddFunction(std::move(func))); + } + { static auto default_scalar_aggregate_options = ScalarAggregateOptions::Defaults(); auto func = std::make_shared( diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc index 46c7716abce..f4df6aa18a3 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -653,6 +653,167 @@ TEST(GroupBy, SumOnly) { } } +TEST(GroupBy, MeanOnly) { + for (bool use_threads : {true, false}) { + SCOPED_TRACE(use_threads ? "parallel/merged" : "serial"); + + auto table = + TableFromJSON(schema({field("argument", float64()), field("key", int64())}), {R"([ + [1.0, 1], + [null, 1] + ])", + R"([ + [0.0, 2], + [null, 3], + [4.0, null], + [3.25, 1], + [0.125, 2] + ])", + R"([ + [-0.25, 2], + [0.75, null], + [null, 3] + ])"}); + + ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, + internal::GroupBy({table->GetColumnByName("argument")}, + {table->GetColumnByName("key")}, + { + {"hash_mean", nullptr}, + }, + use_threads)); + SortBy({"key_0"}, &aggregated_and_grouped); + + AssertDatumsApproxEqual(ArrayFromJSON(struct_({ + field("hash_mean", float64()), + field("key_0", int64()), + }), + R"([ + [2.125, 1], + [-0.041666666666666664, 2], + [null, 3], + [2.375, null] + ])"), + aggregated_and_grouped, + /*verbose=*/true); + } +} + +TEST(GroupBy, VarianceAndStddev) { + auto batch = RecordBatchFromJSON( + schema({field("argument", int32()), field("key", int64())}), R"([ + [1, 1], + [null, 1], + [0, 2], + [null, 3], + [4, null], + [3, 1], + [0, 2], + [-1, 2], + [1, null], + [null, 3] + ])"); + + ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, + internal::GroupBy( + { + batch->GetColumnByName("argument"), + batch->GetColumnByName("argument"), + }, + { + batch->GetColumnByName("key"), + }, + { + {"hash_variance", nullptr}, + {"hash_stddev", nullptr}, + })); + + AssertDatumsApproxEqual(ArrayFromJSON(struct_({ + field("hash_variance", float64()), + field("hash_stddev", float64()), + field("key_0", int64()), + }), + R"([ + [1.0, 1.0, 1], + [0.22222222222222224, 0.4714045207910317, 2], + [null, null, 3], + [2.25, 1.5, null] + ])"), + aggregated_and_grouped, + /*verbose=*/true); + + batch = RecordBatchFromJSON( + schema({field("argument", float64()), field("key", int64())}), R"([ + [1.0, 1], + [null, 1], + [0.0, 2], + [null, 3], + [4.0, null], + [3.0, 1], + [0.0, 2], + [-1.0, 2], + [1.0, null], + [null, 3] + ])"); + + ASSERT_OK_AND_ASSIGN(aggregated_and_grouped, internal::GroupBy( + { + batch->GetColumnByName("argument"), + batch->GetColumnByName("argument"), + }, + { + batch->GetColumnByName("key"), + }, + { + {"hash_variance", nullptr}, + {"hash_stddev", nullptr}, + })); + + AssertDatumsApproxEqual(ArrayFromJSON(struct_({ + field("hash_variance", float64()), + field("hash_stddev", float64()), + field("key_0", int64()), + }), + R"([ + [1.0, 1.0, 1], + [0.22222222222222224, 0.4714045207910317, 2], + [null, null, 3], + [2.25, 1.5, null] + ])"), + aggregated_and_grouped, + /*verbose=*/true); + + // Test ddof + VarianceOptions variance_options(/*ddof=*/2); + ASSERT_OK_AND_ASSIGN(aggregated_and_grouped, + internal::GroupBy( + { + batch->GetColumnByName("argument"), + batch->GetColumnByName("argument"), + }, + { + batch->GetColumnByName("key"), + }, + { + {"hash_variance", &variance_options}, + {"hash_stddev", &variance_options}, + })); + + AssertDatumsApproxEqual(ArrayFromJSON(struct_({ + field("hash_variance", float64()), + field("hash_stddev", float64()), + field("key_0", int64()), + }), + R"([ + [null, null, 1], + [0.6666666666666667, 0.816496580927726, 2], + [null, null, 3], + [null, null, null] + ])"), + aggregated_and_grouped, + /*verbose=*/true); +} + TEST(GroupBy, MinMaxOnly) { for (bool use_exec_plan : {false, true}) { for (bool use_threads : {true, false}) {