Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 121 additions & 1 deletion cpp/src/arrow/compute/kernels/aggregate_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <vector>

#include "arrow/compute/api.h"
#include "arrow/compute/exec_internal.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/benchmark_util.h"
Expand Down Expand Up @@ -463,6 +464,111 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumIntStringPairSet, [&] {
BenchmarkGroupBy(state, {{"hash_sum", NULLPTR}}, {summand}, {int_key, str_key});
});

//
// HashAggregate with one group (= scalar aggregate)
//

namespace internal {

Result<std::vector<const HashAggregateKernel*>> GetKernels(
ExecContext* ctx, const std::vector<internal::Aggregate>& aggregates,
const std::vector<ValueDescr>& in_descrs);

Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
const std::vector<internal::Aggregate>& aggregates,
const std::vector<ValueDescr>& in_descrs);

Result<FieldVector> ResolveKernels(
const std::vector<internal::Aggregate>& aggregates,
const std::vector<const HashAggregateKernel*>& kernels,
const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
const std::vector<ValueDescr>& descrs);

} // namespace internal

static Result<std::shared_ptr<ArrayData>> ScalarAggregate(
const std::vector<internal::Aggregate>& aggregates,
const std::vector<Datum>& arguments) {
ExecContext* ctx = default_exec_context();

// Construct and initialize HashAggregateKernels
ARROW_ASSIGN_OR_RAISE(auto argument_descrs,
ExecBatch::Make(arguments).Map(
[](ExecBatch batch) { return batch.GetDescriptors(); }));

ARROW_ASSIGN_OR_RAISE(auto kernels, GetKernels(ctx, aggregates, argument_descrs));

std::vector<std::unique_ptr<KernelState>> states;
ARROW_ASSIGN_OR_RAISE(states, InitKernels(kernels, ctx, aggregates, argument_descrs));

ARROW_ASSIGN_OR_RAISE(
FieldVector out_fields,
ResolveKernels(aggregates, kernels, states, ctx, argument_descrs));

using arrow::compute::detail::ExecBatchIterator;

ARROW_ASSIGN_OR_RAISE(auto argument_batch_iterator,
ExecBatchIterator::Make(arguments, ctx->exec_chunksize()));
ARROW_ASSIGN_OR_RAISE(
auto data,
AllocateBuffer(argument_batch_iterator->max_chunksize() * sizeof(uint32_t),
ctx->memory_pool()));
auto data_ptr = reinterpret_cast<uint32_t*>(data->mutable_data());
std::fill(data_ptr, data_ptr + argument_batch_iterator->max_chunksize(), 0);
const UInt32Array ids(uint32(), argument_batch_iterator->max_chunksize(),
std::move(data), /*null_bitmap=*/nullptr, /*null_count=*/0,
/*offset=*/0);

// start "streaming" execution
ExecBatch argument_batch;
while (argument_batch_iterator->Next(&argument_batch)) {
for (size_t i = 0; i < kernels.size(); ++i) {
KernelContext batch_ctx{ctx};
batch_ctx.SetState(states[i].get());
ARROW_ASSIGN_OR_RAISE(
auto batch,
ExecBatch::Make({argument_batch[i], ids.Slice(0, argument_batch[i].length())}));
RETURN_NOT_OK(kernels[i]->resize(&batch_ctx, 1));
RETURN_NOT_OK(kernels[i]->consume(&batch_ctx, batch));
}
}

// Finalize output
ArrayDataVector out_data(arguments.size());
for (size_t i = 0; i < kernels.size(); ++i) {
KernelContext batch_ctx{ctx};
batch_ctx.SetState(states[i].get());
Datum out;
RETURN_NOT_OK(kernels[i]->finalize(&batch_ctx, &out));
out_data[i] = out.array();
}
int64_t length = out_data[0]->length;
return ArrayData::Make(struct_(std::move(out_fields)), length,
{/*null_bitmap=*/nullptr}, std::move(out_data),
/*null_count=*/0);
}

static void BenchmarkAggregate(benchmark::State& state,
std::vector<internal::Aggregate> aggregates,
std::vector<Datum> arguments) {
for (auto _ : state) {
ABORT_NOT_OK(ScalarAggregate(aggregates, arguments).status());
}
}

template <typename ArrowType>
static void MinMaxAggregateBench(benchmark::State& state) {
using CType = typename TypeTraits<ArrowType>::CType;

RegressionArgs args(state);
const int64_t array_size = args.size / sizeof(CType);
auto rand = random::RandomArrayGenerator(1923);
auto array = rand.Numeric<ArrowType>(array_size, -100, 100, args.null_proportion);

BenchmarkAggregate(state, {{"hash_min_max", NULLPTR}}, {array});
}

//
// Sum
//
Expand Down Expand Up @@ -577,7 +683,11 @@ static void MinMaxKernelBenchArgs(benchmark::internal::Benchmark* bench) {

#define MINMAX_KERNEL_BENCHMARK(FuncName, Type) \
static void FuncName(benchmark::State& state) { MinMaxKernelBench<Type>(state); } \
BENCHMARK(FuncName)->Apply(MinMaxKernelBenchArgs)
BENCHMARK(FuncName)->Apply(MinMaxKernelBenchArgs); \
static void FuncName##Aggregate(benchmark::State& state) { \
MinMaxAggregateBench<Type>(state); \
} \
BENCHMARK(FuncName##Aggregate)->Apply(MinMaxKernelBenchArgs)

MINMAX_KERNEL_BENCHMARK(MinMaxKernelFloat, FloatType);
MINMAX_KERNEL_BENCHMARK(MinMaxKernelDouble, DoubleType);
Expand All @@ -600,7 +710,17 @@ static void CountKernelBenchInt64(benchmark::State& state) {
ABORT_NOT_OK(Count(array->Slice(1, array_size)).status());
}
}
static void CountKernelBenchInt64Aggregate(benchmark::State& state) {
RegressionArgs args(state);
const int64_t array_size = args.size / sizeof(int64_t);
auto rand = random::RandomArrayGenerator(1923);
auto array = rand.Numeric<Int64Type>(array_size, -100, 100, args.null_proportion);

BenchmarkAggregate(state, {{"hash_count", NULLPTR}}, {array});
}
BENCHMARK(CountKernelBenchInt64)->Args({1 * 1024 * 1024, 2}); // 1M with 50% null.
BENCHMARK(CountKernelBenchInt64Aggregate)
->Args({1 * 1024 * 1024, 2}); // 1M with 50% null.

//
// Variance
Expand Down