From 565c88e363d773b2afe3991903c8e070a4f19b60 Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 9 Jul 2021 09:09:31 -0400
Subject: [PATCH 1/7] ARROW-13295: [C++] add hash_mean, hash_variance,
hash_stddev kernels
---
.../arrow/compute/kernels/hash_aggregate.cc | 380 ++++++++++++++++++
.../compute/kernels/hash_aggregate_test.cc | 131 ++++++
2 files changed, 511 insertions(+)
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index 3e4b401bae9..5018466bf48 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include
#include
#include
#include
@@ -40,6 +41,7 @@
#include "arrow/util/bitmap_writer.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/cpu_info.h"
+#include "arrow/util/int128_internal.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/task_group.h"
#include "arrow/util/thread_pool.h"
@@ -1005,6 +1007,327 @@ struct GroupedSumFactory {
InputType argument_type;
};
+// ----------------------------------------------------------------------
+// Mean implementation
+
+template
+struct GroupedMeanImpl : public GroupedSumImpl {
+ Result Finalize() override {
+ using SumType = typename GroupedSumImpl::SumType;
+ std::shared_ptr null_bitmap;
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr values,
+ AllocateBuffer(num_groups_ * sizeof(double), pool_));
+ int64_t null_count = 0;
+
+ const int64_t* counts = reinterpret_cast(counts_.data());
+ const auto* sums = reinterpret_cast(sums_.data());
+ double* means = reinterpret_cast(values->mutable_data());
+ for (int64_t i = 0; i < num_groups_; ++i) {
+ if (counts[i] > 0) {
+ means[i] = sums[i] / counts[i];
+ continue;
+ }
+ means[i] = 0;
+
+ if (null_bitmap == nullptr) {
+ ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+ BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+ }
+
+ null_count += 1;
+ BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+ }
+
+ return ArrayData::Make(float64(), num_groups_,
+ {std::move(null_bitmap), std::move(values)}, null_count);
+ }
+
+ std::shared_ptr out_type() const override { return float64(); }
+
+ using GroupedSumImpl::num_groups_;
+ using GroupedSumImpl::pool_;
+ using GroupedSumImpl::counts_;
+ using GroupedSumImpl::sums_;
+};
+
+struct GroupedMeanFactory {
+ template ::Type>
+ Status Visit(const T&) {
+ kernel = MakeKernel(std::move(argument_type), HashAggregateInit>);
+ return Status::OK();
+ }
+
+ Status Visit(const HalfFloatType& type) {
+ return Status::NotImplemented("Computing mean of type ", type);
+ }
+
+ Status Visit(const DataType& type) {
+ return Status::NotImplemented("Computing mean of type ", type);
+ }
+
+ static Result Make(const std::shared_ptr& type) {
+ GroupedMeanFactory factory;
+ factory.argument_type = InputType::Array(type);
+ RETURN_NOT_OK(VisitTypeInline(*type, &factory));
+ return std::move(factory.kernel);
+ }
+
+ HashAggregateKernel kernel;
+ InputType argument_type;
+};
+
+// Variance/Stdev implementation
+
+enum class VarOrStd : bool { Var, Std };
+
+using arrow::internal::int128_t;
+
+template
+struct GroupedVarStdImpl : public GroupedAggregator {
+ using CType = typename Type::c_type;
+
+ Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+ options_ = *checked_cast(options);
+ ctx_ = ctx;
+ pool_ = ctx->memory_pool();
+ counts_ = BufferBuilder(pool_);
+ means_ = BufferBuilder(pool_);
+ m2s_ = BufferBuilder(pool_);
+ return Status::OK();
+ }
+
+ Status Resize(int64_t new_num_groups) override {
+ auto added_groups = new_num_groups - num_groups_;
+ num_groups_ = new_num_groups;
+ RETURN_NOT_OK(counts_.Append(added_groups * sizeof(int64_t), 0));
+ RETURN_NOT_OK(means_.Append(added_groups * sizeof(double), 0));
+ RETURN_NOT_OK(m2s_.Append(added_groups * sizeof(double), 0));
+ return Status::OK();
+ }
+
+ Status Consume(const ExecBatch& batch) override { return ConsumeImpl(batch); }
+
+ // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
+ // (see aggregate_var_std.cc)
+ template
+ enable_if_t::value || (sizeof(CType) > 4), Status> ConsumeImpl(
+ const ExecBatch& batch) {
+ using SumType =
+ typename std::conditional::value, double, int128_t>::type;
+
+ int64_t* counts = reinterpret_cast(counts_.mutable_data());
+ double* means = reinterpret_cast(means_.mutable_data());
+ double* m2s = reinterpret_cast(m2s_.mutable_data());
+
+ // XXX this uses naive summation; we should switch to pairwise summation as was
+ // done for the scalar aggregate kernel in ARROW-11567
+ std::vector sums(num_groups_);
+ auto g = batch[1].array()->GetValues(1);
+ VisitArrayDataInline(
+ *batch[0].array(),
+ [&](typename TypeTraits::CType value) {
+ sums[*g] += value;
+ counts[*g] += 1;
+ ++g;
+ },
+ [&] { ++g; });
+
+ for (int64_t i = 0; i < num_groups_; i++) {
+ means[i] = static_cast(sums[i]) / counts[i];
+ }
+
+ g = batch[1].array()->GetValues(1);
+ VisitArrayDataInline(
+ *batch[0].array(),
+ [&](typename TypeTraits::CType value) {
+ const double v = static_cast(value);
+ m2s[*g] += (v - means[*g]) * (v - means[*g]);
+ ++g;
+ },
+ [&] { ++g; });
+
+ return Status::OK();
+ }
+
+ // int32/16/8: textbook one pass algorithm with integer arithmetic (see
+ // aggregate_var_std.cc)
+ template
+ enable_if_t::value && (sizeof(CType) <= 4), Status> ConsumeImpl(
+ const ExecBatch& batch) {
+ // max number of elements that sum will not overflow int64 (2Gi int32 elements)
+ // for uint32: 0 <= sum < 2^63 (int64 >= 0)
+ // for int32: -2^62 <= sum < 2^62
+ constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8);
+
+ const auto& array = *batch[0].array();
+ const auto g = batch[1].array()->GetValues(1);
+
+ std::vector sum(num_groups_);
+ std::vector square_sum(num_groups_);
+
+ ARROW_ASSIGN_OR_RAISE(auto mapping,
+ AllocateBuffer(num_groups_ * sizeof(uint32_t), pool_));
+ for (uint32_t i = 0; static_cast(i) < num_groups_; i++) {
+ reinterpret_cast(mapping->mutable_data())[i] = i;
+ }
+ ArrayData group_id_mapping(uint32(), num_groups_, {nullptr, std::move(mapping)},
+ /*null_count=*/0);
+
+ const CType* values = array.GetValues(1);
+
+ for (int64_t start_index = 0; start_index < batch.length; start_index += max_length) {
+ // process in chunks that overflow will never happen
+
+ // reset state
+ std::fill(sum.begin(), sum.end(), 0);
+ std::fill(square_sum.begin(), square_sum.end(), 0);
+ GroupedVarStdImpl state;
+ RETURN_NOT_OK(state.Init(ctx_, &options_));
+ RETURN_NOT_OK(state.Resize(num_groups_));
+ int64_t* other_counts = reinterpret_cast(state.counts_.mutable_data());
+ double* other_means = reinterpret_cast(state.means_.mutable_data());
+ double* other_m2s = reinterpret_cast(state.m2s_.mutable_data());
+
+ arrow::internal::VisitSetBitRunsVoid(
+ array.buffers[0], array.offset + start_index,
+ std::min(max_length, batch.length - start_index),
+ [&](int64_t pos, int64_t len) {
+ for (int64_t i = 0; i < len; ++i) {
+ const int64_t index = start_index + pos + i;
+ const auto value = values[index];
+ sum[g[index]] += value;
+ square_sum[g[index]] += static_cast(value) * value;
+ other_counts[g[index]]++;
+ }
+ });
+
+ for (int64_t i = 0; i < num_groups_; i++) {
+ if (other_counts[i] == 0) continue;
+
+ const double mean = static_cast(sum[i]) / other_counts[i];
+ // calculate m2 = square_sum - sum * sum / count
+ // decompose `sum * sum / count` into integers and fractions
+ const int128_t sum_square = static_cast(sum[i]) * sum[i];
+ const int128_t integers = sum_square / other_counts[i];
+ const double fractions =
+ static_cast(sum_square % other_counts[i]) / other_counts[i];
+ const double m2 = static_cast(square_sum[i] - integers) - fractions;
+
+ other_means[i] = mean;
+ other_m2s[i] = m2;
+ }
+ RETURN_NOT_OK(this->Merge(std::move(state), group_id_mapping));
+ }
+ return Status::OK();
+ }
+
+ Status Merge(GroupedAggregator&& raw_other,
+ const ArrayData& group_id_mapping) override {
+ // Combine m2 from two chunks (see aggregate_var_std.cc)
+ auto other = checked_cast(&raw_other);
+
+ auto counts = reinterpret_cast(counts_.mutable_data());
+ auto means = reinterpret_cast(means_.mutable_data());
+ auto m2s = reinterpret_cast(m2s_.mutable_data());
+
+ const auto* other_counts = reinterpret_cast(other->counts_.data());
+ const auto* other_means = reinterpret_cast(other->means_.data());
+ const auto* other_m2s = reinterpret_cast(other->m2s_.data());
+
+ auto g = group_id_mapping.GetValues(1);
+ for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) {
+ if (other_counts[other_g] == 0) continue;
+ const double mean =
+ (means[*g] * counts[*g] + other_means[other_g] * other_counts[other_g]) /
+ (counts[*g] + other_counts[other_g]);
+ m2s[*g] += other_m2s[other_g] +
+ counts[*g] * (means[*g] - mean) * (means[*g] - mean) +
+ other_counts[other_g] * (other_means[other_g] - mean) *
+ (other_means[other_g] - mean);
+ counts[*g] += other_counts[other_g];
+ means[*g] = mean;
+ }
+ return Status::OK();
+ }
+
+ Result Finalize() override {
+ std::shared_ptr null_bitmap;
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr values,
+ AllocateBuffer(num_groups_ * sizeof(double), pool_));
+ int64_t null_count = 0;
+
+ double* results = reinterpret_cast(values->mutable_data());
+ const int64_t* counts = reinterpret_cast(counts_.data());
+ const double* m2s = reinterpret_cast(m2s_.data());
+ for (int64_t i = 0; i < num_groups_; ++i) {
+ if (counts[i] > options_.ddof) {
+ const double variance = m2s[i] / (counts[i] - options_.ddof);
+ results[i] = result_type_ == VarOrStd::Var ? variance : std::sqrt(variance);
+ continue;
+ }
+
+ results[i] = 0;
+ if (null_bitmap == nullptr) {
+ ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+ BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+ }
+
+ null_count += 1;
+ BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+ }
+
+ return ArrayData::Make(float64(), num_groups_,
+ {std::move(null_bitmap), std::move(values)}, null_count);
+ }
+
+ std::shared_ptr out_type() const override { return float64(); }
+
+ VarOrStd result_type_;
+ VarianceOptions options_;
+ int64_t num_groups_ = 0;
+ // m2 = count * s2 = sum((X-mean)^2)
+ BufferBuilder counts_, means_, m2s_;
+ ExecContext* ctx_;
+ MemoryPool* pool_;
+};
+
+template
+Result> VarStdInit(KernelContext* ctx,
+ const KernelInitArgs& args) {
+ auto impl = ::arrow::internal::make_unique>();
+ impl->result_type_ = result_type;
+ RETURN_NOT_OK(impl->Init(ctx->exec_context(), args.options));
+ return std::move(impl);
+}
+
+template
+struct GroupedVarStdFactory {
+ template ::value ||
+ is_floating_type::value>>
+ Status Visit(const T&) {
+ kernel = MakeKernel(std::move(argument_type), VarStdInit);
+ return Status::OK();
+ }
+
+ Status Visit(const HalfFloatType& type) {
+ return Status::NotImplemented("Summing data of type ", type);
+ }
+
+ Status Visit(const DataType& type) {
+ return Status::NotImplemented("Summing data of type ", type);
+ }
+
+ static Result Make(const std::shared_ptr& type) {
+ GroupedVarStdFactory factory;
+ factory.argument_type = InputType::Array(type);
+ RETURN_NOT_OK(VisitTypeInline(*type, &factory));
+ return std::move(factory.kernel);
+ }
+
+ HashAggregateKernel kernel;
+ InputType argument_type;
+};
+
// ----------------------------------------------------------------------
// MinMax implementation
@@ -1537,6 +1860,26 @@ const FunctionDoc hash_sum_doc{"Sum values of a numeric array",
("Null values are ignored."),
{"array", "group_id_array"}};
+const FunctionDoc hash_mean_doc{"Average values of a numeric array",
+ ("Null values are ignored."),
+ {"array", "group_id_array"}};
+
+const FunctionDoc hash_stddev_doc{
+ "Calculate the standard deviation of a numeric array",
+ ("The number of degrees of freedom can be controlled using VarianceOptions.\n"
+ "By default (`ddof` = 0), the population standard deviation is calculated.\n"
+ "Nulls are ignored. If there are not enough non-null values in the array\n"
+ "to satisfy `ddof`, null is returned."),
+ {"array", "group_id_array"}};
+
+const FunctionDoc hash_variance_doc{
+ "Calculate the variance of a numeric array",
+ ("The number of degrees of freedom can be controlled using VarianceOptions.\n"
+ "By default (`ddof` = 0), the population variance is calculated.\n"
+ "Nulls are ignored. If there are not enough non-null values in the array\n"
+ "to satisfy `ddof`, null is returned."),
+ {"array", "group_id_array"}};
+
const FunctionDoc hash_min_max_doc{
"Compute the minimum and maximum values of a numeric array",
("Null values are ignored by default.\n"
@@ -1576,6 +1919,43 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) {
DCHECK_OK(registry->AddFunction(std::move(func)));
}
+ {
+ auto func = std::make_shared("hash_mean", Arity::Binary(),
+ &hash_mean_doc);
+ DCHECK_OK(AddHashAggKernels({boolean()}, GroupedMeanFactory::Make, func.get()));
+ DCHECK_OK(AddHashAggKernels(SignedIntTypes(), GroupedMeanFactory::Make, func.get()));
+ DCHECK_OK(
+ AddHashAggKernels(UnsignedIntTypes(), GroupedMeanFactory::Make, func.get()));
+ DCHECK_OK(
+ AddHashAggKernels(FloatingPointTypes(), GroupedMeanFactory::Make, func.get()));
+ DCHECK_OK(registry->AddFunction(std::move(func)));
+ }
+
+ static auto default_variance_options = VarianceOptions::Defaults();
+ {
+ auto func = std::make_shared(
+ "hash_stddev", Arity::Binary(), &hash_stddev_doc, &default_variance_options);
+ DCHECK_OK(AddHashAggKernels(SignedIntTypes(),
+ GroupedVarStdFactory::Make, func.get()));
+ DCHECK_OK(AddHashAggKernels(UnsignedIntTypes(),
+ GroupedVarStdFactory::Make, func.get()));
+ DCHECK_OK(AddHashAggKernels(FloatingPointTypes(),
+ GroupedVarStdFactory::Make, func.get()));
+ DCHECK_OK(registry->AddFunction(std::move(func)));
+ }
+
+ {
+ auto func = std::make_shared(
+ "hash_variance", Arity::Binary(), &hash_variance_doc, &default_variance_options);
+ DCHECK_OK(AddHashAggKernels(SignedIntTypes(),
+ GroupedVarStdFactory::Make, func.get()));
+ DCHECK_OK(AddHashAggKernels(UnsignedIntTypes(),
+ GroupedVarStdFactory::Make, func.get()));
+ DCHECK_OK(AddHashAggKernels(FloatingPointTypes(),
+ GroupedVarStdFactory::Make, func.get()));
+ DCHECK_OK(registry->AddFunction(std::move(func)));
+ }
+
{
static auto default_scalar_aggregate_options = ScalarAggregateOptions::Defaults();
auto func = std::make_shared(
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
index 46c7716abce..d9a7623244c 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
@@ -653,6 +653,137 @@ TEST(GroupBy, SumOnly) {
}
}
+TEST(GroupBy, MeanOnly) {
+ for (bool use_threads : {true, false}) {
+ SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+
+ auto table =
+ TableFromJSON(schema({field("argument", float64()), field("key", int64())}), {R"([
+ [1.0, 1],
+ [null, 1]
+ ])",
+ R"([
+ [0.0, 2],
+ [null, 3],
+ [4.0, null],
+ [3.25, 1],
+ [0.125, 2]
+ ])",
+ R"([
+ [-0.25, 2],
+ [0.75, null],
+ [null, 3]
+ ])"});
+
+ ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
+ internal::GroupBy({table->GetColumnByName("argument")},
+ {table->GetColumnByName("key")},
+ {
+ {"hash_mean", nullptr},
+ },
+ use_threads));
+ SortBy({"key_0"}, &aggregated_and_grouped);
+
+ AssertDatumsEqual(ArrayFromJSON(struct_({
+ field("hash_mean", float64()),
+ field("key_0", int64()),
+ }),
+ R"([
+ [2.125, 1],
+ [-0.041666666666666664, 2],
+ [null, 3],
+ [2.375, null]
+ ])"),
+ aggregated_and_grouped,
+ /*verbose=*/true);
+ }
+}
+
+TEST(GroupBy, VarianceAndStddev) {
+ auto batch = RecordBatchFromJSON(
+ schema({field("argument", int32()), field("key", int64())}), R"([
+ [1, 1],
+ [null, 1],
+ [0, 2],
+ [null, 3],
+ [4, null],
+ [3, 1],
+ [0, 2],
+ [-1, 2],
+ [1, null],
+ [null, 3]
+ ])");
+
+ ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
+ internal::GroupBy(
+ {
+ batch->GetColumnByName("argument"),
+ batch->GetColumnByName("argument"),
+ },
+ {
+ batch->GetColumnByName("key"),
+ },
+ {
+ {"hash_variance", nullptr},
+ {"hash_stddev", nullptr},
+ }));
+
+ AssertDatumsEqual(ArrayFromJSON(struct_({
+ field("hash_variance", float64()),
+ field("hash_stddev", float64()),
+ field("key_0", int64()),
+ }),
+ R"([
+ [1.0, 1.0, 1],
+ [0.22222222222222224, 0.4714045207910317, 2],
+ [null, null, 3],
+ [2.25, 1.5, null]
+ ])"),
+ aggregated_and_grouped,
+ /*verbose=*/true);
+
+ batch = RecordBatchFromJSON(
+ schema({field("argument", float64()), field("key", int64())}), R"([
+ [1.0, 1],
+ [null, 1],
+ [0.0, 2],
+ [null, 3],
+ [4.0, null],
+ [3.0, 1],
+ [0.0, 2],
+ [-1.0, 2],
+ [1.0, null],
+ [null, 3]
+ ])");
+
+ ASSERT_OK_AND_ASSIGN(aggregated_and_grouped, internal::GroupBy(
+ {
+ batch->GetColumnByName("argument"),
+ batch->GetColumnByName("argument"),
+ },
+ {
+ batch->GetColumnByName("key"),
+ },
+ {
+ {"hash_variance", nullptr},
+ {"hash_stddev", nullptr},
+ }));
+
+ AssertDatumsEqual(ArrayFromJSON(struct_({
+ field("hash_variance", float64()),
+ field("hash_stddev", float64()),
+ field("key_0", int64()),
+ }),
+ R"([
+ [1.0, 1.0, 1],
+ [0.22222222222222224, 0.4714045207910317, 2],
+ [null, null, 3],
+ [2.25, 1.5, null]
+ ])"),
+ aggregated_and_grouped,
+ /*verbose=*/true);
+}
+
TEST(GroupBy, MinMaxOnly) {
for (bool use_exec_plan : {false, true}) {
for (bool use_threads : {true, false}) {
From 0f23df9b1962251c64e0f2f999e588b538ad0ae3 Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 23 Jul 2021 11:10:26 -0400
Subject: [PATCH 2/7] ARROW-13295: [C++] Avoid redefining enum
---
cpp/src/arrow/compute/kernels/aggregate_internal.h | 4 ++++
cpp/src/arrow/compute/kernels/aggregate_var_std.cc | 2 --
cpp/src/arrow/compute/kernels/hash_aggregate.cc | 2 --
3 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/aggregate_internal.h b/cpp/src/arrow/compute/kernels/aggregate_internal.h
index ed29f26f2c3..3f5ba39d30e 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_internal.h
+++ b/cpp/src/arrow/compute/kernels/aggregate_internal.h
@@ -55,6 +55,10 @@ struct ScalarAggregator : public KernelState {
virtual Status Finalize(KernelContext* ctx, Datum* out) = 0;
};
+// Helper to differentiate between var/std calculation so we can fold
+// kernel implementations together
+enum class VarOrStd : bool { Var, Std };
+
void AddAggKernel(std::shared_ptr sig, KernelInit init,
ScalarAggregateFunction* func,
SimdLevel::type simd_level = SimdLevel::NONE);
diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
index d6965fed4a3..17f30c9b86b 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
@@ -141,8 +141,6 @@ struct VarStdState {
double m2 = 0; // m2 = count*s2 = sum((X-mean)^2)
};
-enum class VarOrStd : bool { Var, Std };
-
template
struct VarStdImpl : public ScalarAggregator {
using ThisType = VarStdImpl;
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index 5018466bf48..be91e0a2bc4 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -1078,8 +1078,6 @@ struct GroupedMeanFactory {
// Variance/Stdev implementation
-enum class VarOrStd : bool { Var, Std };
-
using arrow::internal::int128_t;
template
From 1c53cd9b5a1cd994498cef939e1391a1d98bf738 Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 23 Jul 2021 12:14:47 -0400
Subject: [PATCH 3/7] ARROW-13295: [C++] Fix some casts, use ApproxEquals
---
cpp/src/arrow/compute/kernels/hash_aggregate.cc | 2 +-
.../arrow/compute/kernels/hash_aggregate_test.cc | 14 +++++++-------
2 files changed, 8 insertions(+), 8 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index be91e0a2bc4..b4d4fc09f13 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -1024,7 +1024,7 @@ struct GroupedMeanImpl : public GroupedSumImpl {
double* means = reinterpret_cast(values->mutable_data());
for (int64_t i = 0; i < num_groups_; ++i) {
if (counts[i] > 0) {
- means[i] = sums[i] / counts[i];
+ means[i] = static_cast(sums[i] / counts[i]);
continue;
}
means[i] = 0;
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
index d9a7623244c..85868d63da7 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
@@ -684,18 +684,18 @@ TEST(GroupBy, MeanOnly) {
use_threads));
SortBy({"key_0"}, &aggregated_and_grouped);
- AssertDatumsEqual(ArrayFromJSON(struct_({
- field("hash_mean", float64()),
- field("key_0", int64()),
- }),
- R"([
+ AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+ field("hash_mean", float64()),
+ field("key_0", int64()),
+ }),
+ R"([
[2.125, 1],
[-0.041666666666666664, 2],
[null, 3],
[2.375, null]
])"),
- aggregated_and_grouped,
- /*verbose=*/true);
+ aggregated_and_grouped,
+ /*verbose=*/true);
}
}
From 44403a0e2330816c1f06685a03d777548e5de572 Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 23 Jul 2021 14:10:16 -0400
Subject: [PATCH 4/7] ARROW-13295: [C++] More ApproxEquals
---
.../compute/kernels/hash_aggregate_test.cc | 32 +++++++++----------
1 file changed, 16 insertions(+), 16 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
index 85868d63da7..eea75156247 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
@@ -728,19 +728,19 @@ TEST(GroupBy, VarianceAndStddev) {
{"hash_stddev", nullptr},
}));
- AssertDatumsEqual(ArrayFromJSON(struct_({
- field("hash_variance", float64()),
- field("hash_stddev", float64()),
- field("key_0", int64()),
- }),
- R"([
+ AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+ field("hash_variance", float64()),
+ field("hash_stddev", float64()),
+ field("key_0", int64()),
+ }),
+ R"([
[1.0, 1.0, 1],
[0.22222222222222224, 0.4714045207910317, 2],
[null, null, 3],
[2.25, 1.5, null]
])"),
- aggregated_and_grouped,
- /*verbose=*/true);
+ aggregated_and_grouped,
+ /*verbose=*/true);
batch = RecordBatchFromJSON(
schema({field("argument", float64()), field("key", int64())}), R"([
@@ -769,19 +769,19 @@ TEST(GroupBy, VarianceAndStddev) {
{"hash_stddev", nullptr},
}));
- AssertDatumsEqual(ArrayFromJSON(struct_({
- field("hash_variance", float64()),
- field("hash_stddev", float64()),
- field("key_0", int64()),
- }),
- R"([
+ AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+ field("hash_variance", float64()),
+ field("hash_stddev", float64()),
+ field("key_0", int64()),
+ }),
+ R"([
[1.0, 1.0, 1],
[0.22222222222222224, 0.4714045207910317, 2],
[null, null, 3],
[2.25, 1.5, null]
])"),
- aggregated_and_grouped,
- /*verbose=*/true);
+ aggregated_and_grouped,
+ /*verbose=*/true);
}
TEST(GroupBy, MinMaxOnly) {
From 98b004e7060f559a04fe88055beb0b745ba8f126 Mon Sep 17 00:00:00 2001
From: David Li
Date: Mon, 26 Jul 2021 11:07:15 -0400
Subject: [PATCH 5/7] ARROW-13295: [C++] Improve tests, comments, cast
---
.../arrow/compute/kernels/hash_aggregate.cc | 8 +++--
.../compute/kernels/hash_aggregate_test.cc | 30 +++++++++++++++++++
2 files changed, 35 insertions(+), 3 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index b4d4fc09f13..57b43137385 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -919,6 +919,8 @@ struct GroupedSumImpl : public GroupedAggregator {
auto sums = reinterpret_cast(sums_.mutable_data());
auto counts = reinterpret_cast(counts_.mutable_data());
+ // XXX this uses naive summation; we should switch to pairwise summation as was
+ // done for the scalar aggregate kernel in ARROW-11758
auto g = batch[1].array()->GetValues(1);
VisitArrayDataInline(
*batch[0].array(),
@@ -1024,7 +1026,7 @@ struct GroupedMeanImpl : public GroupedSumImpl {
double* means = reinterpret_cast(values->mutable_data());
for (int64_t i = 0; i < num_groups_; ++i) {
if (counts[i] > 0) {
- means[i] = static_cast(sums[i] / counts[i]);
+ means[i] = static_cast(sums[i]) / counts[i];
continue;
}
means[i] = 0;
@@ -1308,11 +1310,11 @@ struct GroupedVarStdFactory {
}
Status Visit(const HalfFloatType& type) {
- return Status::NotImplemented("Summing data of type ", type);
+ return Status::NotImplemented("Computing variance/stddev of data of type ", type);
}
Status Visit(const DataType& type) {
- return Status::NotImplemented("Summing data of type ", type);
+ return Status::NotImplemented("Computing variance/stddev of data of type ", type);
}
static Result Make(const std::shared_ptr& type) {
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
index eea75156247..f4df6aa18a3 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
@@ -782,6 +782,36 @@ TEST(GroupBy, VarianceAndStddev) {
])"),
aggregated_and_grouped,
/*verbose=*/true);
+
+ // Test ddof
+ VarianceOptions variance_options(/*ddof=*/2);
+ ASSERT_OK_AND_ASSIGN(aggregated_and_grouped,
+ internal::GroupBy(
+ {
+ batch->GetColumnByName("argument"),
+ batch->GetColumnByName("argument"),
+ },
+ {
+ batch->GetColumnByName("key"),
+ },
+ {
+ {"hash_variance", &variance_options},
+ {"hash_stddev", &variance_options},
+ }));
+
+ AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+ field("hash_variance", float64()),
+ field("hash_stddev", float64()),
+ field("key_0", int64()),
+ }),
+ R"([
+ [null, null, 1],
+ [0.6666666666666667, 0.816496580927726, 2],
+ [null, null, 3],
+ [null, null, null]
+ ])"),
+ aggregated_and_grouped,
+ /*verbose=*/true);
}
TEST(GroupBy, MinMaxOnly) {
From df994626e0e6186edb48039e5f768b445986d13c Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 27 Jul 2021 11:46:29 -0400
Subject: [PATCH 6/7] ARROW-13295: [C++] Try to share math between scalar/hash
variance kernels
---
.../compute/kernels/aggregate_var_std.cc | 29 +++-----
.../kernels/aggregate_var_std_internal.h | 66 +++++++++++++++++++
.../arrow/compute/kernels/hash_aggregate.cc | 41 ++++--------
3 files changed, 86 insertions(+), 50 deletions(-)
create mode 100644 cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h
diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
index 17f30c9b86b..6fa49d03d76 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
@@ -19,6 +19,7 @@
#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/aggregate_var_std_internal.h"
#include "arrow/compute/kernels/common.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/int128_internal.h"
@@ -85,32 +86,22 @@ struct VarStdState {
valid_count -= count;
if (count > 0) {
- int64_t sum = 0;
- int128_t square_sum = 0;
+ IntegerVarStd var_std;
const ArrayData& data = *slice->data();
const CType* values = data.GetValues(1);
VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
[&](int64_t pos, int64_t len) {
for (int64_t i = 0; i < len; ++i) {
const auto value = values[pos + i];
- sum += value;
- square_sum += static_cast(value) * value;
+ var_std.ConsumeOne(value);
}
});
- const double mean = static_cast(sum) / count;
- // calculate m2 = square_sum - sum * sum / count
- // decompose `sum * sum / count` into integers and fractions
- const int128_t sum_square = static_cast(sum) * sum;
- const int128_t integers = sum_square / count;
- const double fractions = static_cast(sum_square % count) / count;
- const double m2 = static_cast(square_sum - integers) - fractions;
-
// merge variance
ThisType state;
- state.count = count;
- state.mean = mean;
- state.m2 = m2;
+ state.count = var_std.count;
+ state.mean = var_std.mean();
+ state.m2 = var_std.m2();
this->MergeFrom(state);
}
}
@@ -128,12 +119,8 @@ struct VarStdState {
this->m2 = state.m2;
return;
}
- double mean = (this->mean * this->count + state.mean * state.count) /
- (this->count + state.count);
- this->m2 += state.m2 + this->count * (this->mean - mean) * (this->mean - mean) +
- state.count * (state.mean - mean) * (state.mean - mean);
- this->count += state.count;
- this->mean = mean;
+ MergeVarStd(this->count, this->mean, state.count, state.mean, state.m2, &this->count,
+ &this->mean, &this->m2);
}
int64_t count = 0;
diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h b/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h
new file mode 100644
index 00000000000..187f1cb7957
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/util/int128_internal.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+using arrow::internal::int128_t;
+
+// Accumulate sum/squared sum (using naive summation)
+// Shared implementation between scalar/hash aggregate variance/stddev kernels
+template
+struct IntegerVarStd {
+ using c_type = typename ArrowType::c_type;
+
+ int64_t count = 0;
+ int64_t sum = 0;
+ int128_t square_sum = 0;
+
+ void ConsumeOne(const c_type value) {
+ sum += value;
+ square_sum += static_cast(value) * value;
+ count++;
+ }
+
+ double mean() const { return static_cast(sum) / count; }
+
+ double m2() const {
+ // calculate m2 = square_sum - sum * sum / count
+ // decompose `sum * sum / count` into integers and fractions
+ const int128_t sum_square = static_cast(sum) * sum;
+ const int128_t integers = sum_square / count;
+ const double fractions = static_cast(sum_square % count) / count;
+ return static_cast(square_sum - integers) - fractions;
+ }
+};
+
+static inline void MergeVarStd(int64_t count1, double mean1, int64_t count2, double mean2,
+ double m22, int64_t* out_count, double* out_mean,
+ double* out_m2) {
+ double mean = (mean1 * count1 + mean2 * count2) / (count1 + count2);
+ *out_m2 += m22 + count1 * (mean1 - mean) * (mean1 - mean) +
+ count2 * (mean2 - mean) * (mean2 - mean);
+ *out_count += count2;
+ *out_mean = mean;
+}
+
+} // namespace internal
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index 57b43137385..472ae956388 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -35,6 +35,7 @@
#include "arrow/compute/exec_internal.h"
#include "arrow/compute/kernel.h"
#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/aggregate_var_std_internal.h"
#include "arrow/compute/kernels/common.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bitmap_ops.h"
@@ -1162,8 +1163,7 @@ struct GroupedVarStdImpl : public GroupedAggregator {
const auto& array = *batch[0].array();
const auto g = batch[1].array()->GetValues(1);
- std::vector sum(num_groups_);
- std::vector square_sum(num_groups_);
+ std::vector> var_std(num_groups_);
ARROW_ASSIGN_OR_RAISE(auto mapping,
AllocateBuffer(num_groups_ * sizeof(uint32_t), pool_));
@@ -1179,8 +1179,8 @@ struct GroupedVarStdImpl : public GroupedAggregator {
// process in chunks that overflow will never happen
// reset state
- std::fill(sum.begin(), sum.end(), 0);
- std::fill(square_sum.begin(), square_sum.end(), 0);
+ var_std.clear();
+ var_std.resize(num_groups_);
GroupedVarStdImpl state;
RETURN_NOT_OK(state.Init(ctx_, &options_));
RETURN_NOT_OK(state.Resize(num_groups_));
@@ -1195,26 +1195,16 @@ struct GroupedVarStdImpl : public GroupedAggregator {
for (int64_t i = 0; i < len; ++i) {
const int64_t index = start_index + pos + i;
const auto value = values[index];
- sum[g[index]] += value;
- square_sum[g[index]] += static_cast(value) * value;
- other_counts[g[index]]++;
+ var_std[g[index]].ConsumeOne(value);
}
});
for (int64_t i = 0; i < num_groups_; i++) {
- if (other_counts[i] == 0) continue;
-
- const double mean = static_cast(sum[i]) / other_counts[i];
- // calculate m2 = square_sum - sum * sum / count
- // decompose `sum * sum / count` into integers and fractions
- const int128_t sum_square = static_cast(sum[i]) * sum[i];
- const int128_t integers = sum_square / other_counts[i];
- const double fractions =
- static_cast(sum_square % other_counts[i]) / other_counts[i];
- const double m2 = static_cast(square_sum[i] - integers) - fractions;
-
- other_means[i] = mean;
- other_m2s[i] = m2;
+ if (var_std[i].count == 0) continue;
+
+ other_counts[i] = var_std[i].count;
+ other_means[i] = var_std[i].mean();
+ other_m2s[i] = var_std[i].m2();
}
RETURN_NOT_OK(this->Merge(std::move(state), group_id_mapping));
}
@@ -1237,15 +1227,8 @@ struct GroupedVarStdImpl : public GroupedAggregator {
auto g = group_id_mapping.GetValues(1);
for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) {
if (other_counts[other_g] == 0) continue;
- const double mean =
- (means[*g] * counts[*g] + other_means[other_g] * other_counts[other_g]) /
- (counts[*g] + other_counts[other_g]);
- m2s[*g] += other_m2s[other_g] +
- counts[*g] * (means[*g] - mean) * (means[*g] - mean) +
- other_counts[other_g] * (other_means[other_g] - mean) *
- (other_means[other_g] - mean);
- counts[*g] += other_counts[other_g];
- means[*g] = mean;
+ MergeVarStd(counts[*g], means[*g], other_counts[other_g], other_means[other_g],
+ other_m2s[other_g], &counts[*g], &means[*g], &m2s[*g]);
}
return Status::OK();
}
From 90c91fa6cdc7c32bf00dd2e88cba27c79dbab717 Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 27 Jul 2021 12:03:55 -0400
Subject: [PATCH 7/7] ARROW-13295: [C++] Add missing pragma
---
cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h | 2 ++
1 file changed, 2 insertions(+)
diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h b/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h
index 187f1cb7957..675ebfd91d3 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h
+++ b/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#pragma once
+
#include "arrow/util/int128_internal.h"
namespace arrow {