Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cpp/src/arrow/compute/api_aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ Result<Datum> Mean(const Datum& value, const ScalarAggregateOptions& options,
return CallFunction("mean", {value}, &options, ctx);
}

Result<Datum> Product(const Datum& value, const ScalarAggregateOptions& options,
ExecContext* ctx) {
return CallFunction("product", {value}, &options, ctx);
}

Result<Datum> Sum(const Datum& value, const ScalarAggregateOptions& options,
ExecContext* ctx) {
return CallFunction("sum", {value}, &options, ctx);
Expand Down
15 changes: 15 additions & 0 deletions cpp/src/arrow/compute/api_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,21 @@ Result<Datum> Mean(
const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Compute the product of values of a numeric array.
///
/// \param[in] value datum to compute product of, expecting Array or ChunkedArray
/// \param[in] options see ScalarAggregateOptions for more information
/// \param[in] ctx the function execution context, optional
/// \return datum of the computed sum as a Scalar
///
/// \since 6.0.0
/// \note API not yet finalized
ARROW_EXPORT
Result<Datum> Product(
const Datum& value,
const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Sum values of a numeric array.
///
/// \param[in] value datum to sum, expecting Array or ChunkedArray
Expand Down
64 changes: 39 additions & 25 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -569,40 +569,54 @@ TEST(ExecPlanExecution, SourceScalarAggSink) {
}

TEST(ExecPlanExecution, ScalarSourceScalarAggSink) {
// ARROW-9056: scalar aggregation can be done over scalars, taking
// into account batch.length > 1 (e.g. a partition column)
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
AsyncGenerator<util::optional<ExecBatch>> sink_gen;

BatchesWithSchema scalar_data;
scalar_data.batches = {
ExecBatchFromJSON({ValueDescr::Scalar(int32()), ValueDescr::Scalar(int32()),
ValueDescr::Scalar(int32())},
"[[5, 5, 5], [5, 5, 5], [5, 5, 5]]"),
ExecBatchFromJSON({int32(), int32(), int32()},
"[[5, 5, 5], [6, 6, 6], [7, 7, 7]]")};
scalar_data.schema =
schema({field("a", int32()), field("b", int32()), field("c", int32())});

ASSERT_OK(Declaration::Sequence(
{
{"source", SourceNodeOptions{scalar_data.schema,
scalar_data.gen(/*parallel=*/false,
/*slow=*/false)}},
{"aggregate",
AggregateNodeOptions{
/*aggregates=*/{
{"count", nullptr}, {"sum", nullptr}, {"mean", nullptr}},
/*targets=*/{"a", "b", "c"},
/*names=*/{"count(a)", "sum(b)", "mean(c)"}}},
{"sink", SinkNodeOptions{&sink_gen}},
})
.AddToPlan(plan.get()));
ExecBatchFromJSON({ValueDescr::Scalar(int32()), ValueDescr::Scalar(boolean())},
"[[5, false], [5, false], [5, false]]"),
ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [6, false], [7, true]]")};
scalar_data.schema = schema({field("a", int32()), field("b", boolean())});

// index can't be tested as it's order-dependent
// mode/quantile can't be tested as they're technically vector kernels
ASSERT_OK(
Declaration::Sequence(
{
{"source",
SourceNodeOptions{scalar_data.schema, scalar_data.gen(/*parallel=*/false,
/*slow=*/false)}},
{"aggregate", AggregateNodeOptions{
/*aggregates=*/{{"all", nullptr},
{"any", nullptr},
{"count", nullptr},
{"mean", nullptr},
{"product", nullptr},
{"stddev", nullptr},
{"sum", nullptr},
{"tdigest", nullptr},
{"variance", nullptr}},
/*targets=*/{"b", "b", "a", "a", "a", "a", "a", "a", "a"},
/*names=*/
{"all(b)", "any(b)", "count(a)", "mean(a)", "product(a)",
"stddev(a)", "sum(a)", "tdigest(a)", "variance(a)"}}},
{"sink", SinkNodeOptions{&sink_gen}},
})
.AddToPlan(plan.get()));

ASSERT_THAT(
StartAndCollect(plan.get(), sink_gen),
Finishes(ResultWith(UnorderedElementsAreArray({
ExecBatchFromJSON({ValueDescr::Scalar(int64()), ValueDescr::Scalar(int64()),
ValueDescr::Scalar(float64())},
"[[6, 33, 5.5]]"),
ExecBatchFromJSON(
{ValueDescr::Scalar(boolean()), ValueDescr::Scalar(boolean()),
ValueDescr::Scalar(int64()), ValueDescr::Scalar(float64()),
ValueDescr::Scalar(int64()), ValueDescr::Scalar(float64()),
ValueDescr::Scalar(int64()), ValueDescr::Array(float64()),
ValueDescr::Scalar(float64())},
R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 0.5833333333333334]])"),
}))));
}

Expand Down
141 changes: 139 additions & 2 deletions cpp/src/arrow/compute/kernels/aggregate_basic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "arrow/compute/kernels/aggregate_basic_internal.h"
#include "arrow/compute/kernels/aggregate_internal.h"
#include "arrow/compute/kernels/common.h"
#include "arrow/compute/kernels/util_internal.h"
#include "arrow/util/cpu_info.h"
#include "arrow/util/make_unique.h"

Expand Down Expand Up @@ -133,6 +134,109 @@ Result<std::unique_ptr<KernelState>> MeanInit(KernelContext* ctx,
return visitor.Create();
}

// ----------------------------------------------------------------------
// Product implementation

using arrow::compute::internal::to_unsigned;

template <typename ArrowType>
struct ProductImpl : public ScalarAggregator {
using ThisType = ProductImpl<ArrowType>;
using AccType = typename FindAccumulatorType<ArrowType>::Type;
using ProductType = typename TypeTraits<AccType>::CType;
using OutputType = typename TypeTraits<AccType>::ScalarType;

explicit ProductImpl(const ScalarAggregateOptions& options) { this->options = options; }

Status Consume(KernelContext*, const ExecBatch& batch) override {
if (batch[0].is_array()) {
const auto& data = batch[0].array();
this->count += data->length - data->GetNullCount();
VisitArrayDataInline<ArrowType>(
*data,
[&](typename TypeTraits<ArrowType>::CType value) {
this->product =
static_cast<ProductType>(to_unsigned(this->product) * to_unsigned(value));
},
[] {});
} else {
const auto& data = *batch[0].scalar();
this->count += data.is_valid * batch.length;
if (data.is_valid) {
for (int64_t i = 0; i < batch.length; i++) {
auto value = internal::UnboxScalar<ArrowType>::Unbox(data);
this->product =
static_cast<ProductType>(to_unsigned(this->product) * to_unsigned(value));
}
}
}
return Status::OK();
}

Status MergeFrom(KernelContext*, KernelState&& src) override {
const auto& other = checked_cast<const ThisType&>(src);
this->count += other.count;
this->product =
static_cast<ProductType>(to_unsigned(this->product) * to_unsigned(other.product));
return Status::OK();
}

Status Finalize(KernelContext*, Datum* out) override {
if (this->count < options.min_count) {
out->value = std::make_shared<OutputType>();
} else {
out->value = MakeScalar(this->product);
}
return Status::OK();
}

size_t count = 0;
typename AccType::c_type product = 1;
ScalarAggregateOptions options;
};

struct ProductInit {
std::unique_ptr<KernelState> state;
KernelContext* ctx;
const DataType& type;
const ScalarAggregateOptions& options;

ProductInit(KernelContext* ctx, const DataType& type,
const ScalarAggregateOptions& options)
: ctx(ctx), type(type), options(options) {}

Status Visit(const DataType&) {
return Status::NotImplemented("No product implemented");
}

Status Visit(const HalfFloatType&) {
return Status::NotImplemented("No product implemented");
}

Status Visit(const BooleanType&) {
state.reset(new ProductImpl<BooleanType>(options));
return Status::OK();
}

template <typename Type>
enable_if_number<Type, Status> Visit(const Type&) {
state.reset(new ProductImpl<Type>(options));
return Status::OK();
}

Result<std::unique_ptr<KernelState>> Create() {
RETURN_NOT_OK(VisitTypeInline(type, this));
return std::move(state);
}

static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
const KernelInitArgs& args) {
ProductInit visitor(ctx, *args.inputs[0].type,
static_cast<const ScalarAggregateOptions&>(*args.options));
return visitor.Create();
}
};

// ----------------------------------------------------------------------
// MinMax implementation

Expand Down Expand Up @@ -290,9 +394,22 @@ struct IndexImpl : public ScalarAggregator {
return Status::OK();
}

const ArgValue desired = internal::UnboxScalar<ArgType>::Unbox(*options.value);

if (batch[0].is_scalar()) {
seen = batch.length;
if (batch[0].scalar()->is_valid) {
const ArgValue v = internal::UnboxScalar<ArgType>::Unbox(*batch[0].scalar());
if (v == desired) {
index = 0;
return Status::Cancelled("Found");
}
}
return Status::OK();
}

auto input = batch[0].array();
seen = input->length;
const ArgValue desired = internal::UnboxScalar<ArgType>::Unbox(*options.value);
int64_t i = 0;

ARROW_UNUSED(internal::VisitArrayValuesInline<ArgType>(
Expand Down Expand Up @@ -455,6 +572,14 @@ const FunctionDoc sum_doc{
{"array"},
"ScalarAggregateOptions"};

const FunctionDoc product_doc{
"Compute the product of values in a numeric array",
("Null values are ignored by default. Minimum count of non-null\n"
"values can be set and null is returned if too few are present.\n"
"This can be changed through ScalarAggregateOptions."),
{"array"},
"ScalarAggregateOptions"};

const FunctionDoc mean_doc{
"Compute the mean of a numeric array",
("Null values are ignored by default. Minimum count of non-null\n"
Expand Down Expand Up @@ -513,7 +638,7 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {

func = std::make_shared<ScalarAggregateFunction>("sum", Arity::Unary(), &sum_doc,
&default_scalar_aggregate_options);
aggregate::AddArrayScalarAggKernels(aggregate::SumInit, {boolean()}, int64(),
aggregate::AddArrayScalarAggKernels(aggregate::SumInit, {boolean()}, uint64(),
func.get());
aggregate::AddArrayScalarAggKernels(aggregate::SumInit, SignedIntTypes(), int64(),
func.get());
Expand Down Expand Up @@ -574,6 +699,18 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {

DCHECK_OK(registry->AddFunction(std::move(func)));

func = std::make_shared<ScalarAggregateFunction>(
"product", Arity::Unary(), &product_doc, &default_scalar_aggregate_options);
aggregate::AddArrayScalarAggKernels(aggregate::ProductInit::Init, {boolean()}, uint64(),
func.get());
aggregate::AddArrayScalarAggKernels(aggregate::ProductInit::Init, SignedIntTypes(),
int64(), func.get());
aggregate::AddArrayScalarAggKernels(aggregate::ProductInit::Init, UnsignedIntTypes(),
uint64(), func.get());
aggregate::AddArrayScalarAggKernels(aggregate::ProductInit::Init, FloatingPointTypes(),
float64(), func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));

// any
func = std::make_shared<ScalarAggregateFunction>("any", Arity::Unary(), &any_doc,
&default_scalar_aggregate_options);
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ struct TDigestImpl : public ScalarAggregator {
} else {
const CType value = UnboxScalar<ArrowType>::Unbox(*batch[0].scalar());
if (batch[0].scalar()->is_valid) {
this->tdigest.NanAdd(value);
for (int64_t i = 0; i < batch.length; i++) {
this->tdigest.NanAdd(value);
}
}
}
return Status::OK();
Expand Down
Loading