From 4edad4a6d4e311fccea7361b3b2a8f4f044b6024 Mon Sep 17 00:00:00 2001 From: Rok Date: Thu, 25 Mar 2021 17:17:39 +0100 Subject: [PATCH 01/23] Adding ScalarAggregateOptions. --- cpp/src/arrow/compute/api_aggregate.cc | 10 +- cpp/src/arrow/compute/api_aggregate.h | 33 +++- .../arrow/compute/kernels/aggregate_basic.cc | 31 +++- .../compute/kernels/aggregate_basic_avx2.cc | 18 +- .../compute/kernels/aggregate_basic_avx512.cc | 18 +- .../kernels/aggregate_basic_internal.h | 43 ++++- .../arrow/compute/kernels/aggregate_test.cc | 160 ++++++++++++++---- 7 files changed, 257 insertions(+), 56 deletions(-) diff --git a/cpp/src/arrow/compute/api_aggregate.cc b/cpp/src/arrow/compute/api_aggregate.cc index 5afa1048960..3ee58f11d76 100644 --- a/cpp/src/arrow/compute/api_aggregate.cc +++ b/cpp/src/arrow/compute/api_aggregate.cc @@ -29,12 +29,14 @@ Result Count(const Datum& value, CountOptions options, ExecContext* ctx) return CallFunction("count", {value}, &options, ctx); } -Result Mean(const Datum& value, ExecContext* ctx) { - return CallFunction("mean", {value}, ctx); +Result Mean(const Datum& value, const ScalarAggregateOptions& options, + ExecContext* ctx) { + return CallFunction("mean", {value}, &options, ctx); } -Result Sum(const Datum& value, ExecContext* ctx) { - return CallFunction("sum", {value}, ctx); +Result Sum(const Datum& value, const ScalarAggregateOptions& options, + ExecContext* ctx) { + return CallFunction("sum", {value}, &options, ctx); } Result MinMax(const Datum& value, const MinMaxOptions& options, ExecContext* ctx) { diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h index ca118ec5678..4b25d7faa52 100644 --- a/cpp/src/arrow/compute/api_aggregate.h +++ b/cpp/src/arrow/compute/api_aggregate.h @@ -37,6 +37,27 @@ class ExecContext; // ---------------------------------------------------------------------- // Aggregate functions +/// \brief Control general scalar aggregate kernel behavior +/// +/// By default, null values are ignored +struct ARROW_EXPORT ScalarAggregateOptions : public FunctionOptions { + enum Mode { + /// Skip null values. + SKIPNA = 0, + /// Calculate over all values. + KEEPNA, + }; + + explicit ScalarAggregateOptions(enum Mode null_handling = SKIPNA, + uint32_t min_count = 0) + : null_handling(null_handling), min_count(min_count) {} + + static ScalarAggregateOptions Defaults() { return ScalarAggregateOptions{}; } + + enum Mode null_handling; + uint32_t min_count = 0; +}; + /// \addtogroup compute-concrete-options /// @{ @@ -167,24 +188,32 @@ Result Count(const Datum& datum, CountOptions options = CountOptions::Def /// \brief Compute the mean of a numeric array. /// /// \param[in] value datum to compute the mean, expecting Array +/// \param[in] options see ScalarAggregateOptions for more information /// \param[in] ctx the function execution context, optional /// \return datum of the computed mean as a DoubleScalar /// /// \since 1.0.0 /// \note API not yet finalized ARROW_EXPORT -Result Mean(const Datum& value, ExecContext* ctx = NULLPTR); +Result Mean( + const Datum& value, + const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(), + ExecContext* ctx = NULLPTR); /// \brief Sum values of a numeric array. /// /// \param[in] value datum to sum, expecting Array or ChunkedArray +/// \param[in] options see ScalarAggregateOptions for more information /// \param[in] ctx the function execution context, optional /// \return datum of the computed sum as a Scalar /// /// \since 1.0.0 /// \note API not yet finalized ARROW_EXPORT -Result Sum(const Datum& value, ExecContext* ctx = NULLPTR); +Result Sum( + const Datum& value, + const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(), + ExecContext* ctx = NULLPTR); /// \brief Calculate the min / max of a numeric array /// diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc index e4eec50c66d..6f7624a73e1 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc @@ -103,20 +103,30 @@ Result> CountInit(KernelContext*, // Sum implementation template -struct SumImplDefault : public SumImpl {}; +struct SumImplDefault : public SumImpl { + explicit SumImplDefault(const ScalarAggregateOptions options) : options(std::move(options)) {} + ScalarAggregateOptions options; +}; template -struct MeanImplDefault : public MeanImpl {}; +struct MeanImplDefault : public MeanImpl { + explicit MeanImplDefault(const ScalarAggregateOptions options) : options(std::move(options)) {} + ScalarAggregateOptions options; +}; Result> SumInit(KernelContext* ctx, const KernelInitArgs& args) { - SumLikeInit visitor(ctx, *args.inputs[0].type); + SumLikeInit visitor( + ctx, *args.inputs[0].type, + static_cast(*args.options)); return visitor.Create(); } Result> MeanInit(KernelContext* ctx, const KernelInitArgs& args) { - SumLikeInit visitor(ctx, *args.inputs[0].type); + SumLikeInit visitor( + ctx, *args.inputs[0].type, + static_cast(*args.options)); return visitor.Create(); } @@ -245,6 +255,9 @@ void AddMinMaxKernels(KernelInit init, namespace internal { namespace { +//using ScalarAggregateState = internal::OptionsWrapper; +////const auto& state = checked_cast&>(*ctx->state()); + const FunctionDoc count_doc{"Count the number of null / non-null values", ("By default, non-null values are counted.\n" "This can be changed through CountOptions."), @@ -286,7 +299,10 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) { aggregate::CountInit, func.get()); DCHECK_OK(registry->AddFunction(std::move(func))); - func = std::make_shared("sum", Arity::Unary(), &sum_doc); + static ScalarAggregateOptions sum_default_scalar_aggregate_options = + ScalarAggregateOptions::Defaults(); + func = std::make_shared("sum", Arity::Unary(), &sum_doc, + &sum_default_scalar_aggregate_options); aggregate::AddBasicAggKernels(aggregate::SumInit, {boolean()}, int64(), func.get()); aggregate::AddBasicAggKernels(aggregate::SumInit, SignedIntTypes(), int64(), func.get()); @@ -310,7 +326,10 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) { #endif DCHECK_OK(registry->AddFunction(std::move(func))); - func = std::make_shared("mean", Arity::Unary(), &mean_doc); + static ScalarAggregateOptions mean_default_scalar_aggregate_options = + ScalarAggregateOptions::Defaults(); + func = std::make_shared( + "mean", Arity::Unary(), &mean_doc, &mean_default_scalar_aggregate_options); aggregate::AddBasicAggKernels(aggregate::MeanInit, {boolean()}, float64(), func.get()); aggregate::AddBasicAggKernels(aggregate::MeanInit, NumericTypes(), float64(), func.get()); diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_avx2.cc b/cpp/src/arrow/compute/kernels/aggregate_basic_avx2.cc index a70363aab9b..6a3049a8317 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_avx2.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_avx2.cc @@ -25,20 +25,30 @@ namespace aggregate { // Sum implementation template -struct SumImplAvx2 : public SumImpl {}; +struct SumImplAvx2 : public SumImpl { + explicit SumImplAvx2(const ScalarAggregateOptions options) : options(std::move(options)) {} + ScalarAggregateOptions options; +}; template -struct MeanImplAvx2 : public MeanImpl {}; +struct MeanImplAvx2 : public MeanImpl { + explicit MeanImplAvx2(const ScalarAggregateOptions options) : options(std::move(options)) {} + ScalarAggregateOptions options; +}; Result> SumInitAvx2(KernelContext* ctx, const KernelInitArgs& args) { - SumLikeInit visitor(ctx, *args.inputs[0].type); + SumLikeInit visitor( + ctx, *args.inputs[0].type, + static_cast(*args.options)); return visitor.Create(); } Result> MeanInitAvx2(KernelContext* ctx, const KernelInitArgs& args) { - SumLikeInit visitor(ctx, *args.inputs[0].type); + SumLikeInit visitor( + ctx, *args.inputs[0].type, + static_cast(*args.options)); return visitor.Create(); } diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_avx512.cc b/cpp/src/arrow/compute/kernels/aggregate_basic_avx512.cc index 1ecbd7041e6..499c4b58f4a 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_avx512.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_avx512.cc @@ -25,20 +25,30 @@ namespace aggregate { // Sum implementation template -struct SumImplAvx512 : public SumImpl {}; +struct SumImplAvx512 : public SumImpl { + explicit SumImplAvx512(const ScalarAggregateOptions options) : options(std::move(options)) {} + ScalarAggregateOptions options; +}; template -struct MeanImplAvx512 : public MeanImpl {}; +struct MeanImplAvx512 : public MeanImpl { + explicit MeanImplAvx512(const ScalarAggregateOptions options) : options(std::move(options)) {} + ScalarAggregateOptions options; +}; Result> SumInitAvx512(KernelContext* ctx, const KernelInitArgs& args) { - SumLikeInit visitor(ctx, *args.inputs[0].type); + SumLikeInit visitor( + ctx, *args.inputs[0].type, + static_cast(*args.options)); return visitor.Create(); } Result> MeanInitAvx512(KernelContext* ctx, const KernelInitArgs& args) { - SumLikeInit visitor(ctx, *args.inputs[0].type); + SumLikeInit visitor( + ctx, *args.inputs[0].type, + static_cast(*args.options)); return visitor.Create(); } diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h index f8db180b1e3..f8ae452ca96 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h @@ -60,6 +60,7 @@ struct SumImpl : public ScalarAggregator { Status Consume(KernelContext*, const ExecBatch& batch) override { const auto& data = batch[0].array(); + this->length = data->length; this->count = data->length - data->GetNullCount(); if (is_boolean_type::value) { this->sum = static_cast(BooleanArray(data).true_count()); @@ -72,12 +73,24 @@ struct SumImpl : public ScalarAggregator { Status MergeFrom(KernelContext*, KernelState&& src) override { const auto& other = checked_cast(src); + this->length += other.length; this->count += other.count; this->sum += other.sum; return Status::OK(); } Status Finalize(KernelContext*, Datum* out) override { + const auto& state = checked_cast(*ctx->state()); +// ARROW_LOG(INFO) << "options.min_count: " << options.min_count; + ARROW_LOG(INFO) << "SumImpl: options.null_handling: " << options.null_handling << " options.min_count: " << options.min_count; + +// ARROW_LOG(INFO) << "state.options.min_count: " << state.options.min_count; +// ARROW_LOG(INFO) << "state.options.null_handling: " << state.options.null_handling; + + if (state.options.min_count != 0 || options.min_count != 0) { + ARROW_LOG(FATAL) << "state.options.min_count: " << state.options.min_count; + } + if (this->count == 0) { out->value = std::make_shared(); } else { @@ -86,21 +99,38 @@ struct SumImpl : public ScalarAggregator { return Status::OK(); } + size_t length = 0; size_t count = 0; typename SumType::c_type sum = 0; + ScalarAggregateOptions options; }; template struct MeanImpl : public SumImpl { Status Finalize(KernelContext*, Datum* out) override { - if (this->count == 0) { + // const ScalarAggregateOptions& options = checked_cast(*ctx->state()).options; + const auto& state = checked_cast(*ctx->state()); +// ARROW_LOG(INFO) << "options.min_count: " << options.min_count; + ARROW_LOG(INFO) << "SumImpl: options.null_handling: " << options.null_handling << " options.min_count: " << options.min_count; + +// ARROW_LOG(INFO) << "state.options.min_count: " << state.options.min_count; +// ARROW_LOG(INFO) << "state.options.null_handling: " << state.options.null_handling; + if (state.options.min_count != 0 || options.min_count != 0) { + ARROW_LOG(FATAL) << "state.options.min_count: " << state.options.min_count; + } + + if (this->count == 0 || this->count < options.min_count) { out->value = std::make_shared(); - } else { + } else if (options.null_handling == ScalarAggregateOptions::SKIPNA) { const double mean = static_cast(this->sum) / this->count; out->value = std::make_shared(mean); + } else { + const double mean = static_cast(this->sum) / this->length; + out->value = std::make_shared(mean); } return Status::OK(); } + ScalarAggregateOptions options; }; template