Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 70 additions & 29 deletions cpp/src/arrow/compute/kernels/aggregate_basic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,30 +116,26 @@ Result<std::unique_ptr<KernelState>> CountInit(KernelContext*,

template <typename ArrowType>
struct SumImplDefault : public SumImpl<ArrowType, SimdLevel::NONE> {
explicit SumImplDefault(const ScalarAggregateOptions& options_) {
this->options = options_;
}
using SumImpl<ArrowType, SimdLevel::NONE>::SumImpl;
};

template <typename ArrowType>
struct MeanImplDefault : public MeanImpl<ArrowType, SimdLevel::NONE> {
explicit MeanImplDefault(const ScalarAggregateOptions& options_) {
this->options = options_;
}
using MeanImpl<ArrowType, SimdLevel::NONE>::MeanImpl;
};

Result<std::unique_ptr<KernelState>> SumInit(KernelContext* ctx,
const KernelInitArgs& args) {
SumLikeInit<SumImplDefault> visitor(
ctx, *args.inputs[0].type,
ctx, args.inputs[0].type,
static_cast<const ScalarAggregateOptions&>(*args.options));
return visitor.Create();
}

Result<std::unique_ptr<KernelState>> MeanInit(KernelContext* ctx,
const KernelInitArgs& args) {
SumLikeInit<MeanImplDefault> visitor(
ctx, *args.inputs[0].type,
ctx, args.inputs[0].type,
static_cast<const ScalarAggregateOptions&>(*args.options));
return visitor.Create();
}
Expand All @@ -156,7 +152,13 @@ struct ProductImpl : public ScalarAggregator {
using ProductType = typename TypeTraits<AccType>::CType;
using OutputType = typename TypeTraits<AccType>::ScalarType;

explicit ProductImpl(const ScalarAggregateOptions& options) { this->options = options; }
explicit ProductImpl(const std::shared_ptr<DataType>& out_type,
const ScalarAggregateOptions& options)
: out_type(out_type),
options(options),
count(0),
product(MultiplyTraits<AccType>::one(*out_type)),
nulls_observed(false) {}

Status Consume(KernelContext*, const ExecBatch& batch) override {
if (batch[0].is_array()) {
Expand All @@ -169,11 +171,11 @@ struct ProductImpl : public ScalarAggregator {
return Status::OK();
}

VisitArrayDataInline<ArrowType>(
internal::VisitArrayValuesInline<ArrowType>(
*data,
[&](typename TypeTraits<ArrowType>::CType value) {
this->product =
static_cast<ProductType>(to_unsigned(this->product) * to_unsigned(value));
MultiplyTraits<AccType>::Multiply(*out_type, this->product, value);
},
[] {});
} else {
Expand All @@ -184,7 +186,7 @@ struct ProductImpl : public ScalarAggregator {
for (int64_t i = 0; i < batch.length; i++) {
auto value = internal::UnboxScalar<ArrowType>::Unbox(data);
this->product =
static_cast<ProductType>(to_unsigned(this->product) * to_unsigned(value));
MultiplyTraits<AccType>::Multiply(*out_type, this->product, value);
}
}
}
Expand All @@ -195,34 +197,35 @@ struct ProductImpl : public ScalarAggregator {
const auto& other = checked_cast<const ThisType&>(src);
this->count += other.count;
this->product =
static_cast<ProductType>(to_unsigned(this->product) * to_unsigned(other.product));
MultiplyTraits<AccType>::Multiply(*out_type, this->product, other.product);
this->nulls_observed = this->nulls_observed || other.nulls_observed;
return Status::OK();
}

Status Finalize(KernelContext*, Datum* out) override {
if ((!options.skip_nulls && this->nulls_observed) ||
(this->count < options.min_count)) {
out->value = std::make_shared<OutputType>();
out->value = std::make_shared<OutputType>(out_type);
} else {
out->value = MakeScalar(this->product);
out->value = std::make_shared<OutputType>(this->product, out_type);
}
return Status::OK();
}

size_t count = 0;
bool nulls_observed = false;
typename AccType::c_type product = 1;
std::shared_ptr<DataType> out_type;
ScalarAggregateOptions options;
size_t count;
ProductType product;
bool nulls_observed;
};

struct ProductInit {
std::unique_ptr<KernelState> state;
KernelContext* ctx;
const DataType& type;
const std::shared_ptr<DataType>& type;
const ScalarAggregateOptions& options;

ProductInit(KernelContext* ctx, const DataType& type,
ProductInit(KernelContext* ctx, const std::shared_ptr<DataType>& type,
const ScalarAggregateOptions& options)
: ctx(ctx), type(type), options(options) {}

Expand All @@ -235,24 +238,32 @@ struct ProductInit {
}

Status Visit(const BooleanType&) {
state.reset(new ProductImpl<BooleanType>(options));
auto ty = TypeTraits<typename ProductImpl<BooleanType>::AccType>::type_singleton();
state.reset(new ProductImpl<BooleanType>(ty, options));
return Status::OK();
}

template <typename Type>
enable_if_number<Type, Status> Visit(const Type&) {
state.reset(new ProductImpl<Type>(options));
auto ty = TypeTraits<typename ProductImpl<Type>::AccType>::type_singleton();
state.reset(new ProductImpl<Type>(ty, options));
return Status::OK();
}

template <typename Type>
enable_if_decimal<Type, Status> Visit(const Type&) {
state.reset(new ProductImpl<Type>(type, options));
return Status::OK();
}

Result<std::unique_ptr<KernelState>> Create() {
RETURN_NOT_OK(VisitTypeInline(type, this));
RETURN_NOT_OK(VisitTypeInline(*type, this));
return std::move(state);
}

static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
const KernelInitArgs& args) {
ProductInit visitor(ctx, *args.inputs[0].type,
ProductInit visitor(ctx, args.inputs[0].type,
static_cast<const ScalarAggregateOptions&>(*args.options));
return visitor.Create();
}
Expand Down Expand Up @@ -550,7 +561,8 @@ void AddBasicAggKernels(KernelInit init,
SimdLevel::type simd_level) {
for (const auto& ty : types) {
// array[InT] -> scalar[OutT]
auto sig = KernelSignature::Make({InputType::Array(ty)}, ValueDescr::Scalar(out_ty));
auto sig =
KernelSignature::Make({InputType::Array(ty->id())}, ValueDescr::Scalar(out_ty));
AddAggKernel(std::move(sig), init, func, simd_level);
}
}
Expand All @@ -561,7 +573,8 @@ void AddScalarAggKernels(KernelInit init,
ScalarAggregateFunction* func) {
for (const auto& ty : types) {
// scalar[InT] -> scalar[OutT]
auto sig = KernelSignature::Make({InputType::Scalar(ty)}, ValueDescr::Scalar(out_ty));
auto sig =
KernelSignature::Make({InputType::Scalar(ty->id())}, ValueDescr::Scalar(out_ty));
AddAggKernel(std::move(sig), init, func, SimdLevel::NONE);
}
}
Expand Down Expand Up @@ -598,6 +611,13 @@ void AddMinMaxKernels(KernelInit init,
}
}

Result<ValueDescr> ScalarFirstType(KernelContext*,
const std::vector<ValueDescr>& descrs) {
ValueDescr result = descrs.front();
result.shape = ValueDescr::SCALAR;
return result;
}

} // namespace aggregate

namespace internal {
Expand Down Expand Up @@ -628,9 +648,12 @@ const FunctionDoc product_doc{
const FunctionDoc mean_doc{
"Compute the mean of a numeric array",
("Null values are ignored by default. Minimum count of non-null\n"
"values can be set and null is returned if too few are "
"present.\nThis can be changed through ScalarAggregateOptions.\n"
"The result is always computed as a double, regardless of the input types."),
"values can be set and null is returned if too few are present.\n"
"This can be changed through ScalarAggregateOptions.\n"
"The result is a double for integer and floating point arguments,\n"
"and a decimal with the same bit-width/precision/scale for decimal arguments.\n"
"For integers and floats, NaN is returned if min_count = 0 and\n"
"there are no values. For decimals, null is returned instead."),
{"array"},
"ScalarAggregateOptions"};

Expand Down Expand Up @@ -683,6 +706,12 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
&default_scalar_aggregate_options);
aggregate::AddArrayScalarAggKernels(aggregate::SumInit, {boolean()}, uint64(),
func.get());
AddAggKernel(KernelSignature::Make({InputType(Type::DECIMAL128)},
OutputType(aggregate::ScalarFirstType)),
aggregate::SumInit, func.get(), SimdLevel::NONE);
AddAggKernel(KernelSignature::Make({InputType(Type::DECIMAL256)},
OutputType(aggregate::ScalarFirstType)),
aggregate::SumInit, func.get(), SimdLevel::NONE);
aggregate::AddArrayScalarAggKernels(aggregate::SumInit, SignedIntTypes(), int64(),
func.get());
aggregate::AddArrayScalarAggKernels(aggregate::SumInit, UnsignedIntTypes(), uint64(),
Expand Down Expand Up @@ -711,6 +740,12 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
func.get());
aggregate::AddArrayScalarAggKernels(aggregate::MeanInit, NumericTypes(), float64(),
func.get());
AddAggKernel(KernelSignature::Make({InputType(Type::DECIMAL128)},
OutputType(aggregate::ScalarFirstType)),
aggregate::MeanInit, func.get(), SimdLevel::NONE);
AddAggKernel(KernelSignature::Make({InputType(Type::DECIMAL256)},
OutputType(aggregate::ScalarFirstType)),
aggregate::MeanInit, func.get(), SimdLevel::NONE);
// Add the SIMD variants for mean
#if defined(ARROW_HAVE_RUNTIME_AVX2)
if (cpu_info->IsSupported(arrow::internal::CpuInfo::AVX2)) {
Expand Down Expand Up @@ -754,6 +789,12 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
uint64(), func.get());
aggregate::AddArrayScalarAggKernels(aggregate::ProductInit::Init, FloatingPointTypes(),
float64(), func.get());
AddAggKernel(KernelSignature::Make({InputType(Type::DECIMAL128)},
OutputType(aggregate::ScalarFirstType)),
aggregate::ProductInit::Init, func.get(), SimdLevel::NONE);
AddAggKernel(KernelSignature::Make({InputType(Type::DECIMAL256)},
OutputType(aggregate::ScalarFirstType)),
aggregate::ProductInit::Init, func.get(), SimdLevel::NONE);
DCHECK_OK(registry->AddFunction(std::move(func)));

// any
Expand Down
12 changes: 4 additions & 8 deletions cpp/src/arrow/compute/kernels/aggregate_basic_avx2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,26 @@ namespace aggregate {

template <typename ArrowType>
struct SumImplAvx2 : public SumImpl<ArrowType, SimdLevel::AVX2> {
explicit SumImplAvx2(const ScalarAggregateOptions& options_) {
this->options = options_;
}
using SumImpl<ArrowType, SimdLevel::AVX2>::SumImpl;
};

template <typename ArrowType>
struct MeanImplAvx2 : public MeanImpl<ArrowType, SimdLevel::AVX2> {
explicit MeanImplAvx2(const ScalarAggregateOptions& options_) {
this->options = options_;
}
using MeanImpl<ArrowType, SimdLevel::AVX2>::MeanImpl;
};

Result<std::unique_ptr<KernelState>> SumInitAvx2(KernelContext* ctx,
const KernelInitArgs& args) {
SumLikeInit<SumImplAvx2> visitor(
ctx, *args.inputs[0].type,
ctx, args.inputs[0].type,
static_cast<const ScalarAggregateOptions&>(*args.options));
return visitor.Create();
}

Result<std::unique_ptr<KernelState>> MeanInitAvx2(KernelContext* ctx,
const KernelInitArgs& args) {
SumLikeInit<MeanImplAvx2> visitor(
ctx, *args.inputs[0].type,
ctx, args.inputs[0].type,
static_cast<const ScalarAggregateOptions&>(*args.options));
return visitor.Create();
}
Expand Down
12 changes: 4 additions & 8 deletions cpp/src/arrow/compute/kernels/aggregate_basic_avx512.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,26 @@ namespace aggregate {

template <typename ArrowType>
struct SumImplAvx512 : public SumImpl<ArrowType, SimdLevel::AVX512> {
explicit SumImplAvx512(const ScalarAggregateOptions& options_) {
this->options = options_;
}
using SumImpl<ArrowType, SimdLevel::AVX512>::SumImpl;
};

template <typename ArrowType>
struct MeanImplAvx512 : public MeanImpl<ArrowType, SimdLevel::AVX512> {
explicit MeanImplAvx512(const ScalarAggregateOptions& options_) {
this->options = options_;
}
using MeanImpl<ArrowType, SimdLevel::AVX512>::MeanImpl;
};

Result<std::unique_ptr<KernelState>> SumInitAvx512(KernelContext* ctx,
const KernelInitArgs& args) {
SumLikeInit<SumImplAvx512> visitor(
ctx, *args.inputs[0].type,
ctx, args.inputs[0].type,
static_cast<const ScalarAggregateOptions&>(*args.options));
return visitor.Create();
}

Result<std::unique_ptr<KernelState>> MeanInitAvx512(KernelContext* ctx,
const KernelInitArgs& args) {
SumLikeInit<MeanImplAvx512> visitor(
ctx, *args.inputs[0].type,
ctx, args.inputs[0].type,
static_cast<const ScalarAggregateOptions&>(*args.options));
return visitor.Create();
}
Expand Down
Loading