diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc index a3fafaae75d..2952eade96b 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc @@ -116,22 +116,18 @@ Result> CountInit(KernelContext*, template struct SumImplDefault : public SumImpl { - explicit SumImplDefault(const ScalarAggregateOptions& options_) { - this->options = options_; - } + using SumImpl::SumImpl; }; template struct MeanImplDefault : public MeanImpl { - explicit MeanImplDefault(const ScalarAggregateOptions& options_) { - this->options = options_; - } + using MeanImpl::MeanImpl; }; Result> SumInit(KernelContext* ctx, const KernelInitArgs& args) { SumLikeInit visitor( - ctx, *args.inputs[0].type, + ctx, args.inputs[0].type, static_cast(*args.options)); return visitor.Create(); } @@ -139,7 +135,7 @@ Result> SumInit(KernelContext* ctx, Result> MeanInit(KernelContext* ctx, const KernelInitArgs& args) { SumLikeInit visitor( - ctx, *args.inputs[0].type, + ctx, args.inputs[0].type, static_cast(*args.options)); return visitor.Create(); } @@ -156,7 +152,13 @@ struct ProductImpl : public ScalarAggregator { using ProductType = typename TypeTraits::CType; using OutputType = typename TypeTraits::ScalarType; - explicit ProductImpl(const ScalarAggregateOptions& options) { this->options = options; } + explicit ProductImpl(const std::shared_ptr& out_type, + const ScalarAggregateOptions& options) + : out_type(out_type), + options(options), + count(0), + product(MultiplyTraits::one(*out_type)), + nulls_observed(false) {} Status Consume(KernelContext*, const ExecBatch& batch) override { if (batch[0].is_array()) { @@ -169,11 +171,11 @@ struct ProductImpl : public ScalarAggregator { return Status::OK(); } - VisitArrayDataInline( + internal::VisitArrayValuesInline( *data, [&](typename TypeTraits::CType value) { this->product = - static_cast(to_unsigned(this->product) * to_unsigned(value)); + MultiplyTraits::Multiply(*out_type, this->product, value); }, [] {}); } else { @@ -184,7 +186,7 @@ struct ProductImpl : public ScalarAggregator { for (int64_t i = 0; i < batch.length; i++) { auto value = internal::UnboxScalar::Unbox(data); this->product = - static_cast(to_unsigned(this->product) * to_unsigned(value)); + MultiplyTraits::Multiply(*out_type, this->product, value); } } } @@ -195,7 +197,7 @@ struct ProductImpl : public ScalarAggregator { const auto& other = checked_cast(src); this->count += other.count; this->product = - static_cast(to_unsigned(this->product) * to_unsigned(other.product)); + MultiplyTraits::Multiply(*out_type, this->product, other.product); this->nulls_observed = this->nulls_observed || other.nulls_observed; return Status::OK(); } @@ -203,26 +205,27 @@ struct ProductImpl : public ScalarAggregator { Status Finalize(KernelContext*, Datum* out) override { if ((!options.skip_nulls && this->nulls_observed) || (this->count < options.min_count)) { - out->value = std::make_shared(); + out->value = std::make_shared(out_type); } else { - out->value = MakeScalar(this->product); + out->value = std::make_shared(this->product, out_type); } return Status::OK(); } - size_t count = 0; - bool nulls_observed = false; - typename AccType::c_type product = 1; + std::shared_ptr out_type; ScalarAggregateOptions options; + size_t count; + ProductType product; + bool nulls_observed; }; struct ProductInit { std::unique_ptr state; KernelContext* ctx; - const DataType& type; + const std::shared_ptr& type; const ScalarAggregateOptions& options; - ProductInit(KernelContext* ctx, const DataType& type, + ProductInit(KernelContext* ctx, const std::shared_ptr& type, const ScalarAggregateOptions& options) : ctx(ctx), type(type), options(options) {} @@ -235,24 +238,32 @@ struct ProductInit { } Status Visit(const BooleanType&) { - state.reset(new ProductImpl(options)); + auto ty = TypeTraits::AccType>::type_singleton(); + state.reset(new ProductImpl(ty, options)); return Status::OK(); } template enable_if_number Visit(const Type&) { - state.reset(new ProductImpl(options)); + auto ty = TypeTraits::AccType>::type_singleton(); + state.reset(new ProductImpl(ty, options)); + return Status::OK(); + } + + template + enable_if_decimal Visit(const Type&) { + state.reset(new ProductImpl(type, options)); return Status::OK(); } Result> Create() { - RETURN_NOT_OK(VisitTypeInline(type, this)); + RETURN_NOT_OK(VisitTypeInline(*type, this)); return std::move(state); } static Result> Init(KernelContext* ctx, const KernelInitArgs& args) { - ProductInit visitor(ctx, *args.inputs[0].type, + ProductInit visitor(ctx, args.inputs[0].type, static_cast(*args.options)); return visitor.Create(); } @@ -550,7 +561,8 @@ void AddBasicAggKernels(KernelInit init, SimdLevel::type simd_level) { for (const auto& ty : types) { // array[InT] -> scalar[OutT] - auto sig = KernelSignature::Make({InputType::Array(ty)}, ValueDescr::Scalar(out_ty)); + auto sig = + KernelSignature::Make({InputType::Array(ty->id())}, ValueDescr::Scalar(out_ty)); AddAggKernel(std::move(sig), init, func, simd_level); } } @@ -561,7 +573,8 @@ void AddScalarAggKernels(KernelInit init, ScalarAggregateFunction* func) { for (const auto& ty : types) { // scalar[InT] -> scalar[OutT] - auto sig = KernelSignature::Make({InputType::Scalar(ty)}, ValueDescr::Scalar(out_ty)); + auto sig = + KernelSignature::Make({InputType::Scalar(ty->id())}, ValueDescr::Scalar(out_ty)); AddAggKernel(std::move(sig), init, func, SimdLevel::NONE); } } @@ -598,6 +611,13 @@ void AddMinMaxKernels(KernelInit init, } } +Result ScalarFirstType(KernelContext*, + const std::vector& descrs) { + ValueDescr result = descrs.front(); + result.shape = ValueDescr::SCALAR; + return result; +} + } // namespace aggregate namespace internal { @@ -628,9 +648,12 @@ const FunctionDoc product_doc{ const FunctionDoc mean_doc{ "Compute the mean of a numeric array", ("Null values are ignored by default. Minimum count of non-null\n" - "values can be set and null is returned if too few are " - "present.\nThis can be changed through ScalarAggregateOptions.\n" - "The result is always computed as a double, regardless of the input types."), + "values can be set and null is returned if too few are present.\n" + "This can be changed through ScalarAggregateOptions.\n" + "The result is a double for integer and floating point arguments,\n" + "and a decimal with the same bit-width/precision/scale for decimal arguments.\n" + "For integers and floats, NaN is returned if min_count = 0 and\n" + "there are no values. For decimals, null is returned instead."), {"array"}, "ScalarAggregateOptions"}; @@ -683,6 +706,12 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) { &default_scalar_aggregate_options); aggregate::AddArrayScalarAggKernels(aggregate::SumInit, {boolean()}, uint64(), func.get()); + AddAggKernel(KernelSignature::Make({InputType(Type::DECIMAL128)}, + OutputType(aggregate::ScalarFirstType)), + aggregate::SumInit, func.get(), SimdLevel::NONE); + AddAggKernel(KernelSignature::Make({InputType(Type::DECIMAL256)}, + OutputType(aggregate::ScalarFirstType)), + aggregate::SumInit, func.get(), SimdLevel::NONE); aggregate::AddArrayScalarAggKernels(aggregate::SumInit, SignedIntTypes(), int64(), func.get()); aggregate::AddArrayScalarAggKernels(aggregate::SumInit, UnsignedIntTypes(), uint64(), @@ -711,6 +740,12 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) { func.get()); aggregate::AddArrayScalarAggKernels(aggregate::MeanInit, NumericTypes(), float64(), func.get()); + AddAggKernel(KernelSignature::Make({InputType(Type::DECIMAL128)}, + OutputType(aggregate::ScalarFirstType)), + aggregate::MeanInit, func.get(), SimdLevel::NONE); + AddAggKernel(KernelSignature::Make({InputType(Type::DECIMAL256)}, + OutputType(aggregate::ScalarFirstType)), + aggregate::MeanInit, func.get(), SimdLevel::NONE); // Add the SIMD variants for mean #if defined(ARROW_HAVE_RUNTIME_AVX2) if (cpu_info->IsSupported(arrow::internal::CpuInfo::AVX2)) { @@ -754,6 +789,12 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) { uint64(), func.get()); aggregate::AddArrayScalarAggKernels(aggregate::ProductInit::Init, FloatingPointTypes(), float64(), func.get()); + AddAggKernel(KernelSignature::Make({InputType(Type::DECIMAL128)}, + OutputType(aggregate::ScalarFirstType)), + aggregate::ProductInit::Init, func.get(), SimdLevel::NONE); + AddAggKernel(KernelSignature::Make({InputType(Type::DECIMAL256)}, + OutputType(aggregate::ScalarFirstType)), + aggregate::ProductInit::Init, func.get(), SimdLevel::NONE); DCHECK_OK(registry->AddFunction(std::move(func))); // any diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_avx2.cc b/cpp/src/arrow/compute/kernels/aggregate_basic_avx2.cc index 8d3e5a0409d..55e9f290e0e 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_avx2.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_avx2.cc @@ -26,22 +26,18 @@ namespace aggregate { template struct SumImplAvx2 : public SumImpl { - explicit SumImplAvx2(const ScalarAggregateOptions& options_) { - this->options = options_; - } + using SumImpl::SumImpl; }; template struct MeanImplAvx2 : public MeanImpl { - explicit MeanImplAvx2(const ScalarAggregateOptions& options_) { - this->options = options_; - } + using MeanImpl::MeanImpl; }; Result> SumInitAvx2(KernelContext* ctx, const KernelInitArgs& args) { SumLikeInit visitor( - ctx, *args.inputs[0].type, + ctx, args.inputs[0].type, static_cast(*args.options)); return visitor.Create(); } @@ -49,7 +45,7 @@ Result> SumInitAvx2(KernelContext* ctx, Result> MeanInitAvx2(KernelContext* ctx, const KernelInitArgs& args) { SumLikeInit visitor( - ctx, *args.inputs[0].type, + ctx, args.inputs[0].type, static_cast(*args.options)); return visitor.Create(); } diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_avx512.cc b/cpp/src/arrow/compute/kernels/aggregate_basic_avx512.cc index 4f8ad74a086..df33dedabba 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_avx512.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_avx512.cc @@ -26,22 +26,18 @@ namespace aggregate { template struct SumImplAvx512 : public SumImpl { - explicit SumImplAvx512(const ScalarAggregateOptions& options_) { - this->options = options_; - } + using SumImpl::SumImpl; }; template struct MeanImplAvx512 : public MeanImpl { - explicit MeanImplAvx512(const ScalarAggregateOptions& options_) { - this->options = options_; - } + using MeanImpl::MeanImpl; }; Result> SumInitAvx512(KernelContext* ctx, const KernelInitArgs& args) { SumLikeInit visitor( - ctx, *args.inputs[0].type, + ctx, args.inputs[0].type, static_cast(*args.options)); return visitor.Create(); } @@ -49,7 +45,7 @@ Result> SumInitAvx512(KernelContext* ctx, Result> MeanInitAvx512(KernelContext* ctx, const KernelInitArgs& args) { SumLikeInit visitor( - ctx, *args.inputs[0].type, + ctx, args.inputs[0].type, static_cast(*args.options)); return visitor.Create(); } diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h index b355a2e1b75..b97af066585 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h @@ -60,10 +60,15 @@ void AddMinMaxAvx512AggKernels(ScalarAggregateFunction* func); template struct SumImpl : public ScalarAggregator { using ThisType = SumImpl; - using CType = typename ArrowType::c_type; + using CType = typename TypeTraits::CType; using SumType = typename FindAccumulatorType::Type; + using SumCType = typename TypeTraits::CType; using OutputType = typename TypeTraits::ScalarType; + SumImpl(const std::shared_ptr& out_type, + const ScalarAggregateOptions& options_) + : out_type(out_type), options(options_) {} + Status Consume(KernelContext*, const ExecBatch& batch) override { if (batch[0].is_array()) { const auto& data = batch[0].array(); @@ -76,12 +81,9 @@ struct SumImpl : public ScalarAggregator { } if (is_boolean_type::value) { - this->sum += - static_cast(BooleanArray(data).true_count()); + this->sum += static_cast(BooleanArray(data).true_count()); } else { - this->sum += - arrow::compute::detail::SumArray( - *data); + this->sum += arrow::compute::detail::SumArray(*data); } } else { const auto& data = *batch[0].scalar(); @@ -105,22 +107,39 @@ struct SumImpl : public ScalarAggregator { Status Finalize(KernelContext*, Datum* out) override { if ((!options.skip_nulls && this->nulls_observed) || (this->count < options.min_count)) { - out->value = std::make_shared(); + out->value = std::make_shared(out_type); } else { - out->value = MakeScalar(this->sum); + out->value = std::make_shared(this->sum, out_type); } return Status::OK(); } size_t count = 0; bool nulls_observed = false; - typename SumType::c_type sum = 0; + SumCType sum = 0; + std::shared_ptr out_type; ScalarAggregateOptions options; }; template struct MeanImpl : public SumImpl { - Status Finalize(KernelContext*, Datum* out) override { + using SumImpl::SumImpl; + + template + enable_if_decimal FinalizeImpl(Datum* out) { + using SumCType = typename SumImpl::SumCType; + using OutputType = typename SumImpl::OutputType; + if ((!options.skip_nulls && this->nulls_observed) || + (this->count < options.min_count) || (this->count == 0)) { + out->value = std::make_shared(this->out_type); + } else { + const SumCType mean = this->sum / this->count; + out->value = std::make_shared(mean, this->out_type); + } + return Status::OK(); + } + template + enable_if_t::value, Status> FinalizeImpl(Datum* out) { if ((!options.skip_nulls && this->nulls_observed) || (this->count < options.min_count)) { out->value = std::make_shared(); @@ -130,17 +149,19 @@ struct MeanImpl : public SumImpl { } return Status::OK(); } - ScalarAggregateOptions options; + Status Finalize(KernelContext*, Datum* out) override { return FinalizeImpl(out); } + + using SumImpl::options; }; template