From 1784a63df1eca6a267f736ad60ceb623a93ca733 Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 27 Jul 2021 09:19:19 -0400
Subject: [PATCH] ARROW-13451: [C++] WIP: benchmark using hash aggregate
kernels for scalar aggregation
---
.../compute/kernels/aggregate_benchmark.cc | 122 +++++++++++++++++-
1 file changed, 121 insertions(+), 1 deletion(-)
diff --git a/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc b/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc
index 39cfeb039a8..9ecc813d354 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc
@@ -20,6 +20,7 @@
#include
#include "arrow/compute/api.h"
+#include "arrow/compute/exec_internal.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/benchmark_util.h"
@@ -463,6 +464,111 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumIntStringPairSet, [&] {
BenchmarkGroupBy(state, {{"hash_sum", NULLPTR}}, {summand}, {int_key, str_key});
});
+//
+// HashAggregate with one group (= scalar aggregate)
+//
+
+namespace internal {
+
+Result> GetKernels(
+ ExecContext* ctx, const std::vector& aggregates,
+ const std::vector& in_descrs);
+
+Result>> InitKernels(
+ const std::vector& kernels, ExecContext* ctx,
+ const std::vector& aggregates,
+ const std::vector& in_descrs);
+
+Result ResolveKernels(
+ const std::vector& aggregates,
+ const std::vector& kernels,
+ const std::vector>& states, ExecContext* ctx,
+ const std::vector& descrs);
+
+} // namespace internal
+
+static Result> ScalarAggregate(
+ const std::vector& aggregates,
+ const std::vector& arguments) {
+ ExecContext* ctx = default_exec_context();
+
+ // Construct and initialize HashAggregateKernels
+ ARROW_ASSIGN_OR_RAISE(auto argument_descrs,
+ ExecBatch::Make(arguments).Map(
+ [](ExecBatch batch) { return batch.GetDescriptors(); }));
+
+ ARROW_ASSIGN_OR_RAISE(auto kernels, GetKernels(ctx, aggregates, argument_descrs));
+
+ std::vector> states;
+ ARROW_ASSIGN_OR_RAISE(states, InitKernels(kernels, ctx, aggregates, argument_descrs));
+
+ ARROW_ASSIGN_OR_RAISE(
+ FieldVector out_fields,
+ ResolveKernels(aggregates, kernels, states, ctx, argument_descrs));
+
+ using arrow::compute::detail::ExecBatchIterator;
+
+ ARROW_ASSIGN_OR_RAISE(auto argument_batch_iterator,
+ ExecBatchIterator::Make(arguments, ctx->exec_chunksize()));
+ ARROW_ASSIGN_OR_RAISE(
+ auto data,
+ AllocateBuffer(argument_batch_iterator->max_chunksize() * sizeof(uint32_t),
+ ctx->memory_pool()));
+ auto data_ptr = reinterpret_cast(data->mutable_data());
+ std::fill(data_ptr, data_ptr + argument_batch_iterator->max_chunksize(), 0);
+ const UInt32Array ids(uint32(), argument_batch_iterator->max_chunksize(),
+ std::move(data), /*null_bitmap=*/nullptr, /*null_count=*/0,
+ /*offset=*/0);
+
+ // start "streaming" execution
+ ExecBatch argument_batch;
+ while (argument_batch_iterator->Next(&argument_batch)) {
+ for (size_t i = 0; i < kernels.size(); ++i) {
+ KernelContext batch_ctx{ctx};
+ batch_ctx.SetState(states[i].get());
+ ARROW_ASSIGN_OR_RAISE(
+ auto batch,
+ ExecBatch::Make({argument_batch[i], ids.Slice(0, argument_batch[i].length())}));
+ RETURN_NOT_OK(kernels[i]->resize(&batch_ctx, 1));
+ RETURN_NOT_OK(kernels[i]->consume(&batch_ctx, batch));
+ }
+ }
+
+ // Finalize output
+ ArrayDataVector out_data(arguments.size());
+ for (size_t i = 0; i < kernels.size(); ++i) {
+ KernelContext batch_ctx{ctx};
+ batch_ctx.SetState(states[i].get());
+ Datum out;
+ RETURN_NOT_OK(kernels[i]->finalize(&batch_ctx, &out));
+ out_data[i] = out.array();
+ }
+ int64_t length = out_data[0]->length;
+ return ArrayData::Make(struct_(std::move(out_fields)), length,
+ {/*null_bitmap=*/nullptr}, std::move(out_data),
+ /*null_count=*/0);
+}
+
+static void BenchmarkAggregate(benchmark::State& state,
+ std::vector aggregates,
+ std::vector arguments) {
+ for (auto _ : state) {
+ ABORT_NOT_OK(ScalarAggregate(aggregates, arguments).status());
+ }
+}
+
+template
+static void MinMaxAggregateBench(benchmark::State& state) {
+ using CType = typename TypeTraits::CType;
+
+ RegressionArgs args(state);
+ const int64_t array_size = args.size / sizeof(CType);
+ auto rand = random::RandomArrayGenerator(1923);
+ auto array = rand.Numeric(array_size, -100, 100, args.null_proportion);
+
+ BenchmarkAggregate(state, {{"hash_min_max", NULLPTR}}, {array});
+}
+
//
// Sum
//
@@ -577,7 +683,11 @@ static void MinMaxKernelBenchArgs(benchmark::internal::Benchmark* bench) {
#define MINMAX_KERNEL_BENCHMARK(FuncName, Type) \
static void FuncName(benchmark::State& state) { MinMaxKernelBench(state); } \
- BENCHMARK(FuncName)->Apply(MinMaxKernelBenchArgs)
+ BENCHMARK(FuncName)->Apply(MinMaxKernelBenchArgs); \
+ static void FuncName##Aggregate(benchmark::State& state) { \
+ MinMaxAggregateBench(state); \
+ } \
+ BENCHMARK(FuncName##Aggregate)->Apply(MinMaxKernelBenchArgs)
MINMAX_KERNEL_BENCHMARK(MinMaxKernelFloat, FloatType);
MINMAX_KERNEL_BENCHMARK(MinMaxKernelDouble, DoubleType);
@@ -600,7 +710,17 @@ static void CountKernelBenchInt64(benchmark::State& state) {
ABORT_NOT_OK(Count(array->Slice(1, array_size)).status());
}
}
+static void CountKernelBenchInt64Aggregate(benchmark::State& state) {
+ RegressionArgs args(state);
+ const int64_t array_size = args.size / sizeof(int64_t);
+ auto rand = random::RandomArrayGenerator(1923);
+ auto array = rand.Numeric(array_size, -100, 100, args.null_proportion);
+
+ BenchmarkAggregate(state, {{"hash_count", NULLPTR}}, {array});
+}
BENCHMARK(CountKernelBenchInt64)->Args({1 * 1024 * 1024, 2}); // 1M with 50% null.
+BENCHMARK(CountKernelBenchInt64Aggregate)
+ ->Args({1 * 1024 * 1024, 2}); // 1M with 50% null.
//
// Variance