From a48e46dc3ef2fcfc546e452fbc6d93927ff72416 Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 5 Aug 2021 15:22:49 -0400
Subject: [PATCH 1/3] ARROW-13575: [C++] Add hash_product kernel
---
cpp/src/arrow/compute/api_aggregate.cc | 5 +
cpp/src/arrow/compute/api_aggregate.h | 15 ++
cpp/src/arrow/compute/exec/plan_test.cc | 64 ++++---
.../arrow/compute/kernels/aggregate_basic.cc | 141 ++++++++++++++-
.../compute/kernels/aggregate_tdigest.cc | 4 +-
.../arrow/compute/kernels/aggregate_test.cc | 146 ++++++++++++++++
.../compute/kernels/aggregate_var_std.cc | 59 +++----
.../arrow/compute/kernels/hash_aggregate.cc | 161 +++++++++++++++++-
.../compute/kernels/hash_aggregate_test.cc | 103 +++++++++--
.../compute/kernels/scalar_arithmetic.cc | 95 ++++++-----
cpp/src/arrow/compute/kernels/util_internal.h | 11 ++
docs/source/cpp/compute.rst | 10 +-
docs/source/python/api/compute.rst | 1 +
13 files changed, 677 insertions(+), 138 deletions(-)
diff --git a/cpp/src/arrow/compute/api_aggregate.cc b/cpp/src/arrow/compute/api_aggregate.cc
index 1b00c366bfd..2261333a880 100644
--- a/cpp/src/arrow/compute/api_aggregate.cc
+++ b/cpp/src/arrow/compute/api_aggregate.cc
@@ -145,6 +145,11 @@ Result Mean(const Datum& value, const ScalarAggregateOptions& options,
return CallFunction("mean", {value}, &options, ctx);
}
+Result Product(const Datum& value, const ScalarAggregateOptions& options,
+ ExecContext* ctx) {
+ return CallFunction("product", {value}, &options, ctx);
+}
+
Result Sum(const Datum& value, const ScalarAggregateOptions& options,
ExecContext* ctx) {
return CallFunction("sum", {value}, &options, ctx);
diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h
index d66d4f1517c..d73be1af97a 100644
--- a/cpp/src/arrow/compute/api_aggregate.h
+++ b/cpp/src/arrow/compute/api_aggregate.h
@@ -169,6 +169,21 @@ Result Mean(
const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(),
ExecContext* ctx = NULLPTR);
+/// \brief Compute the product of values of a numeric array.
+///
+/// \param[in] value datum to compute product of, expecting Array or ChunkedArray
+/// \param[in] options see ScalarAggregateOptions for more information
+/// \param[in] ctx the function execution context, optional
+/// \return datum of the computed sum as a Scalar
+///
+/// \since 6.0.0
+/// \note API not yet finalized
+ARROW_EXPORT
+Result Product(
+ const Datum& value,
+ const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(),
+ ExecContext* ctx = NULLPTR);
+
/// \brief Sum values of a numeric array.
///
/// \param[in] value datum to sum, expecting Array or ChunkedArray
diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc
index 1ca620304cf..3b3d39fd36a 100644
--- a/cpp/src/arrow/compute/exec/plan_test.cc
+++ b/cpp/src/arrow/compute/exec/plan_test.cc
@@ -569,40 +569,54 @@ TEST(ExecPlanExecution, SourceScalarAggSink) {
}
TEST(ExecPlanExecution, ScalarSourceScalarAggSink) {
+ // ARROW-9056: scalar aggregation can be done over scalars, taking
+ // into account batch.length > 1 (e.g. a partition column)
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
AsyncGenerator> sink_gen;
BatchesWithSchema scalar_data;
scalar_data.batches = {
- ExecBatchFromJSON({ValueDescr::Scalar(int32()), ValueDescr::Scalar(int32()),
- ValueDescr::Scalar(int32())},
- "[[5, 5, 5], [5, 5, 5], [5, 5, 5]]"),
- ExecBatchFromJSON({int32(), int32(), int32()},
- "[[5, 5, 5], [6, 6, 6], [7, 7, 7]]")};
- scalar_data.schema =
- schema({field("a", int32()), field("b", int32()), field("c", int32())});
-
- ASSERT_OK(Declaration::Sequence(
- {
- {"source", SourceNodeOptions{scalar_data.schema,
- scalar_data.gen(/*parallel=*/false,
- /*slow=*/false)}},
- {"aggregate",
- AggregateNodeOptions{
- /*aggregates=*/{
- {"count", nullptr}, {"sum", nullptr}, {"mean", nullptr}},
- /*targets=*/{"a", "b", "c"},
- /*names=*/{"count(a)", "sum(b)", "mean(c)"}}},
- {"sink", SinkNodeOptions{&sink_gen}},
- })
- .AddToPlan(plan.get()));
+ ExecBatchFromJSON({ValueDescr::Scalar(int32()), ValueDescr::Scalar(boolean())},
+ "[[5, false], [5, false], [5, false]]"),
+ ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [6, false], [7, true]]")};
+ scalar_data.schema = schema({field("a", int32()), field("b", boolean())});
+
+ // index can't be tested as it's order-dependent
+ // mode/quantile can't be tested as they're technically vector kernels
+ ASSERT_OK(
+ Declaration::Sequence(
+ {
+ {"source",
+ SourceNodeOptions{scalar_data.schema, scalar_data.gen(/*parallel=*/false,
+ /*slow=*/false)}},
+ {"aggregate", AggregateNodeOptions{
+ /*aggregates=*/{{"all", nullptr},
+ {"any", nullptr},
+ {"count", nullptr},
+ {"mean", nullptr},
+ {"product", nullptr},
+ {"stddev", nullptr},
+ {"sum", nullptr},
+ {"tdigest", nullptr},
+ {"variance", nullptr}},
+ /*targets=*/{"b", "b", "a", "a", "a", "a", "a", "a", "a"},
+ /*names=*/
+ {"all(b)", "any(b)", "count(a)", "mean(a)", "product(a)",
+ "stddev(a)", "sum(a)", "tdigest(a)", "variance(a)"}}},
+ {"sink", SinkNodeOptions{&sink_gen}},
+ })
+ .AddToPlan(plan.get()));
ASSERT_THAT(
StartAndCollect(plan.get(), sink_gen),
Finishes(ResultWith(UnorderedElementsAreArray({
- ExecBatchFromJSON({ValueDescr::Scalar(int64()), ValueDescr::Scalar(int64()),
- ValueDescr::Scalar(float64())},
- "[[6, 33, 5.5]]"),
+ ExecBatchFromJSON(
+ {ValueDescr::Scalar(boolean()), ValueDescr::Scalar(boolean()),
+ ValueDescr::Scalar(int64()), ValueDescr::Scalar(float64()),
+ ValueDescr::Scalar(int64()), ValueDescr::Scalar(float64()),
+ ValueDescr::Scalar(int64()), ValueDescr::Array(float64()),
+ ValueDescr::Scalar(float64())},
+ R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 0.5833333333333334]])"),
}))));
}
diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
index a7df66695b2..bc92c203687 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
@@ -19,6 +19,7 @@
#include "arrow/compute/kernels/aggregate_basic_internal.h"
#include "arrow/compute/kernels/aggregate_internal.h"
#include "arrow/compute/kernels/common.h"
+#include "arrow/compute/kernels/util_internal.h"
#include "arrow/util/cpu_info.h"
#include "arrow/util/make_unique.h"
@@ -133,6 +134,109 @@ Result> MeanInit(KernelContext* ctx,
return visitor.Create();
}
+// ----------------------------------------------------------------------
+// Product implementation
+
+using arrow::compute::internal::to_unsigned;
+
+template
+struct ProductImpl : public ScalarAggregator {
+ using ThisType = ProductImpl;
+ using AccType = typename FindAccumulatorType::Type;
+ using ProductType = typename TypeTraits::CType;
+ using OutputType = typename TypeTraits::ScalarType;
+
+ explicit ProductImpl(const ScalarAggregateOptions& options) { this->options = options; }
+
+ Status Consume(KernelContext*, const ExecBatch& batch) override {
+ if (batch[0].is_array()) {
+ const auto& data = batch[0].array();
+ this->count += data->length - data->GetNullCount();
+ VisitArrayDataInline(
+ *data,
+ [&](typename TypeTraits::CType value) {
+ this->product =
+ static_cast(to_unsigned(this->product) * to_unsigned(value));
+ },
+ [] {});
+ } else {
+ const auto& data = *batch[0].scalar();
+ this->count += data.is_valid * batch.length;
+ if (data.is_valid) {
+ for (int64_t i = 0; i < batch.length; i++) {
+ auto value = internal::UnboxScalar::Unbox(data);
+ this->product =
+ static_cast(to_unsigned(this->product) * to_unsigned(value));
+ }
+ }
+ }
+ return Status::OK();
+ }
+
+ Status MergeFrom(KernelContext*, KernelState&& src) override {
+ const auto& other = checked_cast(src);
+ this->count += other.count;
+ this->product =
+ static_cast(to_unsigned(this->product) * to_unsigned(other.product));
+ return Status::OK();
+ }
+
+ Status Finalize(KernelContext*, Datum* out) override {
+ if (this->count < options.min_count) {
+ out->value = std::make_shared();
+ } else {
+ out->value = MakeScalar(this->product);
+ }
+ return Status::OK();
+ }
+
+ size_t count = 0;
+ typename AccType::c_type product = 1;
+ ScalarAggregateOptions options;
+};
+
+struct ProductInit {
+ std::unique_ptr state;
+ KernelContext* ctx;
+ const DataType& type;
+ const ScalarAggregateOptions& options;
+
+ ProductInit(KernelContext* ctx, const DataType& type,
+ const ScalarAggregateOptions& options)
+ : ctx(ctx), type(type), options(options) {}
+
+ Status Visit(const DataType&) {
+ return Status::NotImplemented("No product implemented");
+ }
+
+ Status Visit(const HalfFloatType&) {
+ return Status::NotImplemented("No product implemented");
+ }
+
+ Status Visit(const BooleanType&) {
+ state.reset(new ProductImpl(options));
+ return Status::OK();
+ }
+
+ template
+ enable_if_number Visit(const Type&) {
+ state.reset(new ProductImpl(options));
+ return Status::OK();
+ }
+
+ Result> Create() {
+ RETURN_NOT_OK(VisitTypeInline(type, this));
+ return std::move(state);
+ }
+
+ static Result> Init(KernelContext* ctx,
+ const KernelInitArgs& args) {
+ ProductInit visitor(ctx, *args.inputs[0].type,
+ static_cast(*args.options));
+ return visitor.Create();
+ }
+};
+
// ----------------------------------------------------------------------
// MinMax implementation
@@ -290,9 +394,22 @@ struct IndexImpl : public ScalarAggregator {
return Status::OK();
}
+ const ArgValue desired = internal::UnboxScalar::Unbox(*options.value);
+
+ if (batch[0].is_scalar()) {
+ seen = batch.length;
+ if (batch[0].scalar()->is_valid) {
+ const ArgValue v = internal::UnboxScalar::Unbox(*batch[0].scalar());
+ if (v == desired) {
+ index = 0;
+ return Status::Cancelled("Found");
+ }
+ }
+ return Status::OK();
+ }
+
auto input = batch[0].array();
seen = input->length;
- const ArgValue desired = internal::UnboxScalar::Unbox(*options.value);
int64_t i = 0;
ARROW_UNUSED(internal::VisitArrayValuesInline(
@@ -455,6 +572,14 @@ const FunctionDoc sum_doc{
{"array"},
"ScalarAggregateOptions"};
+const FunctionDoc product_doc{
+ "Compute the product of values in a numeric array",
+ ("Null values are ignored by default. Minimum count of non-null\n"
+ "values can be set and null is returned if too few are present.\n"
+ "This can be changed through ScalarAggregateOptions."),
+ {"array"},
+ "ScalarAggregateOptions"};
+
const FunctionDoc mean_doc{
"Compute the mean of a numeric array",
("Null values are ignored by default. Minimum count of non-null\n"
@@ -513,7 +638,7 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
func = std::make_shared("sum", Arity::Unary(), &sum_doc,
&default_scalar_aggregate_options);
- aggregate::AddArrayScalarAggKernels(aggregate::SumInit, {boolean()}, int64(),
+ aggregate::AddArrayScalarAggKernels(aggregate::SumInit, {boolean()}, uint64(),
func.get());
aggregate::AddArrayScalarAggKernels(aggregate::SumInit, SignedIntTypes(), int64(),
func.get());
@@ -574,6 +699,18 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared(
+ "product", Arity::Unary(), &product_doc, &default_scalar_aggregate_options);
+ aggregate::AddArrayScalarAggKernels(aggregate::ProductInit::Init, {boolean()}, uint64(),
+ func.get());
+ aggregate::AddArrayScalarAggKernels(aggregate::ProductInit::Init, SignedIntTypes(),
+ int64(), func.get());
+ aggregate::AddArrayScalarAggKernels(aggregate::ProductInit::Init, UnsignedIntTypes(),
+ uint64(), func.get());
+ aggregate::AddArrayScalarAggKernels(aggregate::ProductInit::Init, FloatingPointTypes(),
+ float64(), func.get());
+ DCHECK_OK(registry->AddFunction(std::move(func)));
+
// any
func = std::make_shared("any", Arity::Unary(), &any_doc,
&default_scalar_aggregate_options);
diff --git a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
index 4c261604c85..fd0533edd4f 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
@@ -55,7 +55,9 @@ struct TDigestImpl : public ScalarAggregator {
} else {
const CType value = UnboxScalar::Unbox(*batch[0].scalar());
if (batch[0].scalar()->is_valid) {
- this->tdigest.NanAdd(value);
+ for (int64_t i = 0; i < batch.length; i++) {
+ this->tdigest.NanAdd(value);
+ }
}
}
return Status::OK();
diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc
index ec12b0913b8..db96bc3cad8 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc
@@ -189,6 +189,53 @@ TEST(TestBooleanAggregation, Sum) {
ResultWith(Datum(MakeNullScalar(uint64()))));
}
+TEST(TestBooleanAggregation, Product) {
+ const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults();
+ ValidateBooleanAgg("[]", std::make_shared(), options);
+ ValidateBooleanAgg("[null]", std::make_shared(), options);
+ ValidateBooleanAgg("[null, false]", std::make_shared(0),
+ options);
+ ValidateBooleanAgg("[true]", std::make_shared(1), options);
+ ValidateBooleanAgg("[true, false, true]", std::make_shared(0),
+ options);
+ ValidateBooleanAgg("[true, false, true, true, null]",
+ std::make_shared(0), options);
+
+ const ScalarAggregateOptions& options_min_count_zero =
+ ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/0);
+ ValidateBooleanAgg("[]", std::make_shared(1),
+ options_min_count_zero);
+ ValidateBooleanAgg("[null]", std::make_shared(1),
+ options_min_count_zero);
+
+ const char* json = "[true, null, true, null]";
+ ValidateBooleanAgg(
+ json, std::make_shared(1),
+ ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/1));
+ ValidateBooleanAgg(
+ json, std::make_shared(1),
+ ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/2));
+ ValidateBooleanAgg(
+ json, std::make_shared(),
+ ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/3));
+ ValidateBooleanAgg(
+ json, std::make_shared(1),
+ ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/1));
+ ValidateBooleanAgg(
+ json, std::make_shared(1),
+ ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/2));
+ ValidateBooleanAgg(
+ json, std::make_shared(),
+ ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/3));
+
+ EXPECT_THAT(Product(MakeScalar(true)),
+ ResultWith(Datum(std::make_shared(1))));
+ EXPECT_THAT(Product(MakeScalar(false)),
+ ResultWith(Datum(std::make_shared(0))));
+ EXPECT_THAT(Product(MakeNullScalar(boolean())),
+ ResultWith(Datum(MakeNullScalar(uint64()))));
+}
+
TEST(TestBooleanAggregation, Mean) {
const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults();
ValidateBooleanAgg("[]", std::make_shared(), options);
@@ -415,6 +462,105 @@ TEST_F(TestSumKernelRoundOff, Basics) {
ASSERT_EQ(sum->value, 2756346749973250.0);
}
+//
+// Product
+//
+
+template
+class TestNumericProductKernel : public ::testing::Test {};
+
+TYPED_TEST_SUITE(TestNumericProductKernel, NumericArrowTypes);
+TYPED_TEST(TestNumericProductKernel, SimpleProduct) {
+ using ProductType = typename FindAccumulatorType::Type;
+ using T = typename TypeParam::c_type;
+ using ProductT = typename ProductType::c_type;
+
+ Datum null_result(std::make_shared::ScalarType>());
+
+ auto ty = TypeTraits::type_singleton();
+
+ EXPECT_THAT(Product(ArrayFromJSON(ty, "[]")), ResultWith(null_result));
+ EXPECT_THAT(Product(ArrayFromJSON(ty, "[null]")), ResultWith(null_result));
+ EXPECT_THAT(Product(ArrayFromJSON(ty, "[0, 1, 2, 3, 4, 5]")),
+ ResultWith(Datum(static_cast(0))));
+ Datum chunks = ChunkedArrayFromJSON(ty, {"[1, 2, 3, 4, 5]"});
+ EXPECT_THAT(Product(chunks), ResultWith(Datum(static_cast(120))));
+ chunks = ChunkedArrayFromJSON(ty, {"[1, 2]", "[3, 4, 5]"});
+ EXPECT_THAT(Product(chunks), ResultWith(Datum(static_cast(120))));
+ chunks = ChunkedArrayFromJSON(ty, {"[1, 2]", "[]", "[3, 4, 5]"});
+ EXPECT_THAT(Product(chunks), ResultWith(Datum(static_cast(120))));
+
+ const ScalarAggregateOptions& options =
+ ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/0);
+
+ EXPECT_THAT(Product(ArrayFromJSON(ty, "[]"), options), Datum(static_cast(1)));
+ EXPECT_THAT(Product(ArrayFromJSON(ty, "[null]"), options),
+ Datum(static_cast(1)));
+ chunks = ChunkedArrayFromJSON(ty, {});
+ EXPECT_THAT(Product(chunks, options), Datum(static_cast(1)));
+
+ EXPECT_THAT(Product(ArrayFromJSON(ty, "[1, null, 3, null, 3, null, 7]"), options),
+ Datum(static_cast(63)));
+
+ EXPECT_THAT(Product(Datum(static_cast(5))),
+ ResultWith(Datum(static_cast(5))));
+ EXPECT_THAT(Product(MakeNullScalar(TypeTraits::type_singleton())),
+ ResultWith(null_result));
+}
+
+TYPED_TEST(TestNumericProductKernel, ScalarAggregateOptions) {
+ using ProductType = typename FindAccumulatorType::Type;
+ using T = typename TypeParam::c_type;
+ using ProductT = typename ProductType::c_type;
+
+ Datum null_result(std::make_shared::ScalarType>());
+ Datum one_result(static_cast(1));
+ Datum result(static_cast(63));
+
+ auto ty = TypeTraits::type_singleton();
+ Datum empty = ArrayFromJSON(ty, "[]");
+ Datum null = ArrayFromJSON(ty, "[null]");
+ Datum arr = ArrayFromJSON(ty, "[1, null, 3, null, 3, null, 7]");
+
+ EXPECT_THAT(
+ Product(empty, ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/0)),
+ ResultWith(one_result));
+ EXPECT_THAT(Product(null, ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/0)),
+ ResultWith(one_result));
+ EXPECT_THAT(Product(arr, ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/3)),
+ ResultWith(result));
+ EXPECT_THAT(Product(arr, ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/4)),
+ ResultWith(result));
+ EXPECT_THAT(Product(arr, ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/5)),
+ ResultWith(null_result));
+ EXPECT_THAT(
+ Product(empty, ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/1)),
+ ResultWith(null_result));
+ EXPECT_THAT(Product(null, ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/1)),
+ ResultWith(null_result));
+ EXPECT_THAT(Product(arr, ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/3)),
+ ResultWith(result));
+ EXPECT_THAT(Product(arr, ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/4)),
+ ResultWith(result));
+ EXPECT_THAT(Product(arr, ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/5)),
+ ResultWith(null_result));
+
+ EXPECT_THAT(
+ Product(Datum(static_cast(5)), ScalarAggregateOptions(/*skip_nulls=*/false)),
+ ResultWith(Datum(static_cast(5))));
+ EXPECT_THAT(Product(Datum(static_cast(5)),
+ ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/2)),
+ ResultWith(null_result));
+ EXPECT_THAT(Product(MakeNullScalar(TypeTraits::type_singleton()),
+ ScalarAggregateOptions(/*skip_nulls=*/false)),
+ ResultWith(null_result));
+}
+
+TEST(TestProductKernel, Overflow) {
+ EXPECT_THAT(Product(ArrayFromJSON(int64(), "[8589934592, 8589934593]")),
+ ResultWith(Datum(static_cast(8589934592))));
+}
+
//
// Count
//
diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
index 6fa49d03d76..cf58bcdd93e 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
@@ -107,6 +107,18 @@ struct VarStdState {
}
}
+ // Scalar: textbook algorithm
+ void Consume(const Scalar& scalar, const int64_t count) {
+ this->m2 = 0;
+ if (scalar.is_valid) {
+ this->count = count;
+ this->mean = UnboxScalar::Unbox(scalar);
+ } else {
+ this->count = 0;
+ this->mean = 0;
+ }
+ }
+
// Combine `m2` from two chunks (m2 = n*s2)
// https://www.emathzone.com/tutorials/basic-statistics/combined-variance.html
void MergeFrom(const ThisType& state) {
@@ -138,8 +150,12 @@ struct VarStdImpl : public ScalarAggregator {
: out_type(out_type), options(options), return_type(return_type) {}
Status Consume(KernelContext*, const ExecBatch& batch) override {
- ArrayType array(batch[0].array());
- this->state.Consume(array);
+ if (batch[0].is_array()) {
+ ArrayType array(batch[0].array());
+ this->state.Consume(array);
+ } else {
+ this->state.Consume(*batch[0].scalar(), batch.length);
+ }
return Status::OK();
}
@@ -166,34 +182,6 @@ struct VarStdImpl : public ScalarAggregator {
VarOrStd return_type;
};
-struct ScalarVarStdImpl : public ScalarAggregator {
- explicit ScalarVarStdImpl(const VarianceOptions& options)
- : options(options), seen(false) {}
-
- Status Consume(KernelContext*, const ExecBatch& batch) override {
- seen = batch[0].scalar()->is_valid;
- return Status::OK();
- }
-
- Status MergeFrom(KernelContext*, KernelState&& src) override {
- const auto& other = checked_cast(src);
- seen = seen || other.seen;
- return Status::OK();
- }
-
- Status Finalize(KernelContext*, Datum* out) override {
- if (!seen || options.ddof > 0) {
- out->value = std::make_shared();
- } else {
- out->value = std::make_shared(0.0);
- }
- return Status::OK();
- }
-
- const VarianceOptions options;
- bool seen;
-};
-
struct VarStdInitState {
std::unique_ptr state;
KernelContext* ctx;
@@ -247,21 +235,12 @@ Result> VarianceInit(KernelContext* ctx,
return visitor.Create();
}
-Result> ScalarVarStdInit(KernelContext* ctx,
- const KernelInitArgs& args) {
- return arrow::internal::make_unique(
- static_cast(*args.options));
-}
-
void AddVarStdKernels(KernelInit init,
const std::vector>& types,
ScalarAggregateFunction* func) {
for (const auto& ty : types) {
- auto sig = KernelSignature::Make({InputType::Array(ty)}, float64());
+ auto sig = KernelSignature::Make({InputType(ty)}, float64());
AddAggKernel(std::move(sig), init, func);
-
- sig = KernelSignature::Make({InputType::Scalar(ty)}, float64());
- AddAggKernel(std::move(sig), ScalarVarStdInit, func);
}
}
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index ba5c90f15de..b3d602a89ac 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -37,6 +37,7 @@
#include "arrow/compute/kernels/aggregate_internal.h"
#include "arrow/compute/kernels/aggregate_var_std_internal.h"
#include "arrow/compute/kernels/common.h"
+#include "arrow/compute/kernels/util_internal.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/bitmap_writer.h"
@@ -901,8 +902,9 @@ struct GroupedSumImpl : public GroupedAggregator {
using AccType = typename FindAccumulatorType::Type;
using SumType = typename TypeTraits::CType;
- Status Init(ExecContext* ctx, const FunctionOptions*) override {
+ Status Init(ExecContext* ctx, const FunctionOptions* options) override {
pool_ = ctx->memory_pool();
+ options_ = checked_cast(*options);
sums_ = BufferBuilder(pool_);
counts_ = BufferBuilder(pool_);
out_type_ = TypeTraits::type_singleton();
@@ -955,10 +957,11 @@ struct GroupedSumImpl : public GroupedAggregator {
Result Finalize() override {
std::shared_ptr null_bitmap;
+ const int64_t* counts = reinterpret_cast(counts_.data());
int64_t null_count = 0;
for (int64_t i = 0; i < num_groups_; ++i) {
- if (reinterpret_cast(counts_.data())[i] > 0) continue;
+ if (counts[i] >= options_.min_count) continue;
if (null_bitmap == nullptr) {
ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
@@ -980,6 +983,7 @@ struct GroupedSumImpl : public GroupedAggregator {
// NB: counts are used here instead of a simple "has_values_" bitmap since
// we expect to reuse this kernel to handle Mean
int64_t num_groups_ = 0;
+ ScalarAggregateOptions options_;
BufferBuilder sums_, counts_;
std::shared_ptr out_type_;
MemoryPool* pool_;
@@ -1011,6 +1015,125 @@ struct GroupedSumFactory {
InputType argument_type;
};
+// ----------------------------------------------------------------------
+// Product implementation
+
+template
+struct GroupedProductImpl final : public GroupedAggregator {
+ using AccType = typename FindAccumulatorType::Type;
+ using ProductType = typename TypeTraits::CType;
+
+ Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+ pool_ = ctx->memory_pool();
+ options_ = checked_cast(*options);
+ products_ = TypedBufferBuilder(pool_);
+ counts_ = TypedBufferBuilder(pool_);
+ out_type_ = TypeTraits::type_singleton();
+ return Status::OK();
+ }
+
+ Status Resize(int64_t new_num_groups) override {
+ auto added_groups = new_num_groups - num_groups_;
+ num_groups_ = new_num_groups;
+ RETURN_NOT_OK(products_.Append(added_groups * sizeof(AccType), 1));
+ RETURN_NOT_OK(counts_.Append(added_groups, 0));
+ return Status::OK();
+ }
+
+ Status Consume(const ExecBatch& batch) override {
+ ProductType* products = products_.mutable_data();
+ int64_t* counts = counts_.mutable_data();
+ auto g = batch[1].array()->GetValues(1);
+ VisitArrayDataInline(
+ *batch[0].array(),
+ [&](typename TypeTraits::CType value) {
+ products[*g] = static_cast(
+ to_unsigned(products[*g]) * to_unsigned(static_cast(value)));
+ counts[*g++] += 1;
+ },
+ [&] { ++g; });
+ return Status::OK();
+ }
+
+ Status Merge(GroupedAggregator&& raw_other,
+ const ArrayData& group_id_mapping) override {
+ auto other = checked_cast(&raw_other);
+
+ int64_t* counts = counts_.mutable_data();
+ ProductType* products = products_.mutable_data();
+
+ const int64_t* other_counts = other->counts_.mutable_data();
+ const ProductType* other_products = other->products_.mutable_data();
+ const uint32_t* g = group_id_mapping.GetValues(1);
+
+ for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) {
+ products[*g] = static_cast(to_unsigned(products[*g]) *
+ to_unsigned(other_products[other_g]));
+ counts[*g] += other_counts[other_g];
+ }
+ return Status::OK();
+ }
+
+ Result Finalize() override {
+ ARROW_ASSIGN_OR_RAISE(auto products, products_.Finish());
+ const int64_t* counts = counts_.data();
+
+ std::shared_ptr null_bitmap;
+ int64_t null_count = 0;
+
+ for (int64_t i = 0; i < num_groups_; ++i) {
+ if (counts[i] >= options_.min_count) continue;
+
+ if (null_bitmap == nullptr) {
+ ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+ BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+ }
+
+ null_count += 1;
+ BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+ }
+
+ return ArrayData::Make(std::move(out_type_), num_groups_,
+ {std::move(null_bitmap), std::move(products)}, null_count);
+ }
+
+ std::shared_ptr out_type() const override { return out_type_; }
+
+ int64_t num_groups_ = 0;
+ ScalarAggregateOptions options_;
+ TypedBufferBuilder products_;
+ TypedBufferBuilder counts_;
+ std::shared_ptr out_type_;
+ MemoryPool* pool_;
+};
+
+struct GroupedProductFactory {
+ template ::Type>
+ Status Visit(const T&) {
+ kernel =
+ MakeKernel(std::move(argument_type), HashAggregateInit>);
+ return Status::OK();
+ }
+
+ Status Visit(const HalfFloatType& type) {
+ return Status::NotImplemented("Taking product of data of type ", type);
+ }
+
+ Status Visit(const DataType& type) {
+ return Status::NotImplemented("Taking product of data of type ", type);
+ }
+
+ static Result Make(const std::shared_ptr& type) {
+ GroupedProductFactory factory;
+ factory.argument_type = InputType::Array(type);
+ RETURN_NOT_OK(VisitTypeInline(*type, &factory));
+ return std::move(factory.kernel);
+ }
+
+ HashAggregateKernel kernel;
+ InputType argument_type;
+};
+
// ----------------------------------------------------------------------
// Mean implementation
@@ -1027,7 +1150,7 @@ struct GroupedMeanImpl : public GroupedSumImpl {
const auto* sums = reinterpret_cast(sums_.data());
double* means = reinterpret_cast(values->mutable_data());
for (int64_t i = 0; i < num_groups_; ++i) {
- if (counts[i] > 0) {
+ if (counts[i] >= options_.min_count) {
means[i] = static_cast(sums[i]) / counts[i];
continue;
}
@@ -1049,6 +1172,7 @@ struct GroupedMeanImpl : public GroupedSumImpl {
std::shared_ptr out_type() const override { return float64(); }
using GroupedSumImpl::num_groups_;
+ using GroupedSumImpl::options_;
using GroupedSumImpl::pool_;
using GroupedSumImpl::counts_;
using GroupedSumImpl::sums_;
@@ -1964,6 +2088,12 @@ const FunctionDoc hash_sum_doc{"Sum values of a numeric array",
("Null values are ignored."),
{"array", "group_id_array"}};
+const FunctionDoc hash_product_doc{
+ "Compute product of values of a numeric array",
+ ("Null values are ignored.\n"
+ "Overflow will wrap around as if the calculation was done with unsigned integers."),
+ {"array", "group_id_array"}};
+
const FunctionDoc hash_mean_doc{"Average values of a numeric array",
("Null values are ignored."),
{"array", "group_id_array"}};
@@ -2008,8 +2138,8 @@ const FunctionDoc hash_all_doc{"Test whether all elements evaluate to true",
} // namespace
void RegisterHashAggregateBasic(FunctionRegistry* registry) {
+ static auto default_scalar_aggregate_options = ScalarAggregateOptions::Defaults();
{
- static auto default_scalar_aggregate_options = ScalarAggregateOptions::Defaults();
auto func = std::make_shared(
"hash_count", Arity::Binary(), &hash_count_doc,
&default_scalar_aggregate_options);
@@ -2020,8 +2150,8 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) {
}
{
- auto func = std::make_shared("hash_sum", Arity::Binary(),
- &hash_sum_doc);
+ auto func = std::make_shared(
+ "hash_sum", Arity::Binary(), &hash_sum_doc, &default_scalar_aggregate_options);
DCHECK_OK(AddHashAggKernels({boolean()}, GroupedSumFactory::Make, func.get()));
DCHECK_OK(AddHashAggKernels(SignedIntTypes(), GroupedSumFactory::Make, func.get()));
DCHECK_OK(AddHashAggKernels(UnsignedIntTypes(), GroupedSumFactory::Make, func.get()));
@@ -2031,8 +2161,22 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) {
}
{
- auto func = std::make_shared("hash_mean", Arity::Binary(),
- &hash_mean_doc);
+ auto func = std::make_shared(
+ "hash_product", Arity::Binary(), &hash_product_doc,
+ &default_scalar_aggregate_options);
+ DCHECK_OK(AddHashAggKernels({boolean()}, GroupedProductFactory::Make, func.get()));
+ DCHECK_OK(
+ AddHashAggKernels(SignedIntTypes(), GroupedProductFactory::Make, func.get()));
+ DCHECK_OK(
+ AddHashAggKernels(UnsignedIntTypes(), GroupedProductFactory::Make, func.get()));
+ DCHECK_OK(
+ AddHashAggKernels(FloatingPointTypes(), GroupedProductFactory::Make, func.get()));
+ DCHECK_OK(registry->AddFunction(std::move(func)));
+ }
+
+ {
+ auto func = std::make_shared(
+ "hash_mean", Arity::Binary(), &hash_mean_doc, &default_scalar_aggregate_options);
DCHECK_OK(AddHashAggKernels({boolean()}, GroupedMeanFactory::Make, func.get()));
DCHECK_OK(AddHashAggKernels(SignedIntTypes(), GroupedMeanFactory::Make, func.get()));
DCHECK_OK(
@@ -2081,7 +2225,6 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) {
}
{
- static auto default_scalar_aggregate_options = ScalarAggregateOptions::Defaults();
auto func = std::make_shared(
"hash_min_max", Arity::Binary(), &hash_min_max_doc,
&default_scalar_aggregate_options);
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
index b0114eac55c..cb9d05e0d35 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
@@ -767,24 +767,28 @@ TEST(GroupBy, MeanOnly) {
[null, 3]
])"});
+ ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
- internal::GroupBy({table->GetColumnByName("argument")},
+ internal::GroupBy({table->GetColumnByName("argument"),
+ table->GetColumnByName("argument")},
{table->GetColumnByName("key")},
{
{"hash_mean", nullptr},
+ {"hash_mean", &min_count},
},
use_threads));
SortBy({"key_0"}, &aggregated_and_grouped);
AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+ field("hash_mean", float64()),
field("hash_mean", float64()),
field("key_0", int64()),
}),
R"([
- [2.125, 1],
- [-0.041666666666666664, 2],
- [null, 3],
- [2.375, null]
+ [2.125, null, 1],
+ [-0.041666666666666664, -0.041666666666666664, 2],
+ [null, null, 3],
+ [2.375, null, null]
])"),
aggregated_and_grouped,
/*verbose=*/true);
@@ -1074,6 +1078,7 @@ TEST(GroupBy, CountAndSum) {
])");
ScalarAggregateOptions count_options;
+ ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
ASSERT_OK_AND_ASSIGN(
Datum aggregated_and_grouped,
internal::GroupBy(
@@ -1081,6 +1086,7 @@ TEST(GroupBy, CountAndSum) {
// NB: passing an argument twice or also using it as a key is legal
batch->GetColumnByName("argument"),
batch->GetColumnByName("argument"),
+ batch->GetColumnByName("argument"),
batch->GetColumnByName("key"),
},
{
@@ -1089,6 +1095,7 @@ TEST(GroupBy, CountAndSum) {
{
{"hash_count", &count_options},
{"hash_sum", nullptr},
+ {"hash_sum", &min_count},
{"hash_sum", nullptr},
}));
@@ -1097,19 +1104,94 @@ TEST(GroupBy, CountAndSum) {
field("hash_count", int64()),
// NB: summing a float32 array results in float64 sums
field("hash_sum", float64()),
+ field("hash_sum", float64()),
field("hash_sum", int64()),
field("key_0", int64()),
}),
R"([
- [2, 4.25, 3, 1],
- [3, -0.125, 6, 2],
- [0, null, 6, 3],
- [2, 4.75, null, null]
+ [2, 4.25, null, 3, 1],
+ [3, -0.125, -0.125, 6, 2],
+ [0, null, null, 6, 3],
+ [2, 4.75, null, null, null]
])"),
aggregated_and_grouped,
/*verbose=*/true);
}
+TEST(GroupBy, Product) {
+ auto batch = RecordBatchFromJSON(
+ schema({field("argument", float64()), field("key", int64())}), R"([
+ [-1.0, 1],
+ [null, 1],
+ [0.0, 2],
+ [null, 3],
+ [4.0, null],
+ [3.25, 1],
+ [0.125, 2],
+ [-0.25, 2],
+ [0.75, null],
+ [null, 3]
+ ])");
+
+ ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
+ ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
+ internal::GroupBy(
+ {
+ batch->GetColumnByName("argument"),
+ batch->GetColumnByName("key"),
+ batch->GetColumnByName("argument"),
+ },
+ {
+ batch->GetColumnByName("key"),
+ },
+ {
+ {"hash_product", nullptr},
+ {"hash_product", nullptr},
+ {"hash_product", &min_count},
+ }));
+
+ AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+ field("hash_product", float64()),
+ field("hash_product", int64()),
+ field("hash_product", float64()),
+ field("key_0", int64()),
+ }),
+ R"([
+ [-3.25, 1, null, 1],
+ [0.0, 8, 0.0, 2],
+ [null, 9, null, 3],
+ [3.0, null, null, null]
+ ])"),
+ aggregated_and_grouped,
+ /*verbose=*/true);
+
+ // Overflow should wrap around
+ batch = RecordBatchFromJSON(schema({field("argument", int64()), field("key", int64())}),
+ R"([
+ [8589934592, 1],
+ [8589934593, 1]
+ ])");
+
+ ASSERT_OK_AND_ASSIGN(aggregated_and_grouped, internal::GroupBy(
+ {
+ batch->GetColumnByName("argument"),
+ },
+ {
+ batch->GetColumnByName("key"),
+ },
+ {
+ {"hash_product", nullptr},
+ }));
+
+ AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+ field("hash_product", int64()),
+ field("key_0", int64()),
+ }),
+ R"([[8589934592, 1]])"),
+ aggregated_and_grouped,
+ /*verbose=*/true);
+}
+
TEST(GroupBy, SumOnlyStringAndDictKeys) {
for (auto key_type : {utf8(), dictionary(int32(), utf8())}) {
SCOPED_TRACE("key type: " + key_type->ToString());
@@ -1205,6 +1287,7 @@ TEST(GroupBy, CountNull) {
}
TEST(GroupBy, RandomArraySum) {
+ ScalarAggregateOptions options(/*skip_nulls=*/true, /*min_count=*/0);
for (int64_t length : {1 << 10, 1 << 12, 1 << 15}) {
for (auto null_probability : {0.0, 0.01, 0.5, 1.0}) {
auto batch = random::GenerateBatch(
@@ -1218,7 +1301,7 @@ TEST(GroupBy, RandomArraySum) {
ValidateGroupBy(
{
- {"hash_sum", nullptr},
+ {"hash_sum", &options},
},
{batch->GetColumnByName("argument")}, {batch->GetColumnByName("key")});
}
diff --git a/cpp/src/arrow/compute/kernels/scalar_arithmetic.cc b/cpp/src/arrow/compute/kernels/scalar_arithmetic.cc
index a5d4a557740..bca363022bd 100644
--- a/cpp/src/arrow/compute/kernels/scalar_arithmetic.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_arithmetic.cc
@@ -47,6 +47,9 @@ using applicator::ScalarUnaryNotNull;
namespace {
+// N.B. take care not to conflict with type_traits.h as that can cause surprises in a
+// unity build
+
template
using is_unsigned_integer = std::integral_constant::value &&
std::is_unsigned::value>;
@@ -56,28 +59,23 @@ using is_signed_integer =
std::integral_constant::value && std::is_signed::value>;
template
-using enable_if_signed_integer = enable_if_t::value, R>;
+using enable_if_signed_c_integer = enable_if_t::value, R>;
template
-using enable_if_unsigned_integer = enable_if_t::value, R>;
+using enable_if_unsigned_c_integer = enable_if_t::value, R>;
template
-using enable_if_integer =
+using enable_if_c_integer =
enable_if_t::value || is_unsigned_integer::value, R>;
template
using enable_if_floating_point = enable_if_t::value, R>;
template
-using enable_if_decimal =
+using enable_if_decimal_value =
enable_if_t::value || std::is_same::value,
T>;
-template ::type>
-constexpr Unsigned to_unsigned(T signed_) {
- return static_cast(signed_);
-}
-
struct AbsoluteValue {
template
static constexpr enable_if_floating_point Call(KernelContext*, T arg, Status*) {
@@ -85,19 +83,19 @@ struct AbsoluteValue {
}
template
- static constexpr enable_if_unsigned_integer Call(KernelContext*, T arg, Status*) {
+ static constexpr enable_if_unsigned_c_integer Call(KernelContext*, T arg, Status*) {
return arg;
}
template
- static constexpr enable_if_signed_integer Call(KernelContext*, T arg, Status* st) {
+ static constexpr enable_if_signed_c_integer Call(KernelContext*, T arg, Status* st) {
return (arg < 0) ? arrow::internal::SafeSignedNegate(arg) : arg;
}
};
struct AbsoluteValueChecked {
template
- static enable_if_signed_integer Call(KernelContext*, Arg arg, Status* st) {
+ static enable_if_signed_c_integer Call(KernelContext*, Arg arg, Status* st) {
static_assert(std::is_same::value, "");
if (arg == std::numeric_limits::min()) {
*st = Status::Invalid("overflow");
@@ -107,7 +105,7 @@ struct AbsoluteValueChecked {
}
template
- static enable_if_unsigned_integer Call(KernelContext* ctx, Arg arg, Status* st) {
+ static enable_if_unsigned_c_integer Call(KernelContext* ctx, Arg arg, Status* st) {
static_assert(std::is_same::value, "");
return arg;
}
@@ -127,26 +125,26 @@ struct Add {
}
template
- static constexpr enable_if_unsigned_integer Call(KernelContext*, Arg0 left,
- Arg1 right, Status*) {
+ static constexpr enable_if_unsigned_c_integer Call(KernelContext*, Arg0 left,
+ Arg1 right, Status*) {
return left + right;
}
template
- static constexpr enable_if_signed_integer Call(KernelContext*, Arg0 left, Arg1 right,
- Status*) {
+ static constexpr enable_if_signed_c_integer Call(KernelContext*, Arg0 left,
+ Arg1 right, Status*) {
return arrow::internal::SafeSignedAdd(left, right);
}
template
- static enable_if_decimal Call(KernelContext*, Arg0 left, Arg1 right, Status*) {
+ static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) {
return left + right;
}
};
struct AddChecked {
template
- static enable_if_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) {
+ static enable_if_c_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) {
static_assert(std::is_same::value && std::is_same::value, "");
T result = 0;
if (ARROW_PREDICT_FALSE(AddWithOverflow(left, right, &result))) {
@@ -163,7 +161,7 @@ struct AddChecked {
}
template
- static enable_if_decimal Call(KernelContext*, Arg0 left, Arg1 right, Status*) {
+ static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) {
return left + right;
}
};
@@ -177,28 +175,28 @@ struct Subtract {
}
template
- static constexpr enable_if_unsigned_integer Call(KernelContext*, Arg0 left,
- Arg1 right, Status*) {
+ static constexpr enable_if_unsigned_c_integer Call(KernelContext*, Arg0 left,
+ Arg1 right, Status*) {
static_assert(std::is_same::value && std::is_same::value, "");
return left - right;
}
template
- static constexpr enable_if_signed_integer Call(KernelContext*, Arg0 left, Arg1 right,
- Status*) {
+ static constexpr enable_if_signed_c_integer Call(KernelContext*, Arg0 left,
+ Arg1 right, Status*) {
static_assert(std::is_same::value && std::is_same::value, "");
return arrow::internal::SafeSignedSubtract(left, right);
}
template
- static enable_if_decimal Call(KernelContext*, Arg0 left, Arg1 right, Status*) {
+ static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) {
return left + (-right);
}
};
struct SubtractChecked {
template
- static enable_if_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) {
+ static enable_if_c_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) {
static_assert(std::is_same::value && std::is_same::value, "");
T result = 0;
if (ARROW_PREDICT_FALSE(SubtractWithOverflow(left, right, &result))) {
@@ -215,7 +213,7 @@ struct SubtractChecked {
}
template
- static enable_if_decimal Call(KernelContext*, Arg0 left, Arg1 right, Status*) {
+ static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) {
return left + (-right);
}
};
@@ -266,14 +264,14 @@ struct Multiply {
}
template
- static enable_if_decimal Call(KernelContext*, Arg0 left, Arg1 right, Status*) {
+ static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) {
return left * right;
}
};
struct MultiplyChecked {
template
- static enable_if_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) {
+ static enable_if_c_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) {
static_assert(std::is_same::value && std::is_same::value, "");
T result = 0;
if (ARROW_PREDICT_FALSE(MultiplyWithOverflow(left, right, &result))) {
@@ -290,7 +288,7 @@ struct MultiplyChecked {
}
template
- static enable_if_decimal Call(KernelContext*, Arg0 left, Arg1 right, Status*) {
+ static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) {
return left * right;
}
};
@@ -303,7 +301,7 @@ struct Divide {
}
template
- static enable_if_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) {
+ static enable_if_c_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) {
T result;
if (ARROW_PREDICT_FALSE(DivideWithOverflow(left, right, &result))) {
if (right == 0) {
@@ -316,7 +314,8 @@ struct Divide {
}
template
- static enable_if_decimal Call(KernelContext*, Arg0 left, Arg1 right, Status* st) {
+ static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right,
+ Status* st) {
if (right == Arg1()) {
*st = Status::Invalid("Divide by zero");
return T();
@@ -328,7 +327,7 @@ struct Divide {
struct DivideChecked {
template
- static enable_if_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) {
+ static enable_if_c_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) {
static_assert(std::is_same::value && std::is_same