diff --git a/cpp/src/arrow/compute/api_aggregate.cc b/cpp/src/arrow/compute/api_aggregate.cc index 1b00c366bfd..2261333a880 100644 --- a/cpp/src/arrow/compute/api_aggregate.cc +++ b/cpp/src/arrow/compute/api_aggregate.cc @@ -145,6 +145,11 @@ Result Mean(const Datum& value, const ScalarAggregateOptions& options, return CallFunction("mean", {value}, &options, ctx); } +Result Product(const Datum& value, const ScalarAggregateOptions& options, + ExecContext* ctx) { + return CallFunction("product", {value}, &options, ctx); +} + Result Sum(const Datum& value, const ScalarAggregateOptions& options, ExecContext* ctx) { return CallFunction("sum", {value}, &options, ctx); diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h index d66d4f1517c..d73be1af97a 100644 --- a/cpp/src/arrow/compute/api_aggregate.h +++ b/cpp/src/arrow/compute/api_aggregate.h @@ -169,6 +169,21 @@ Result Mean( const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(), ExecContext* ctx = NULLPTR); +/// \brief Compute the product of values of a numeric array. +/// +/// \param[in] value datum to compute product of, expecting Array or ChunkedArray +/// \param[in] options see ScalarAggregateOptions for more information +/// \param[in] ctx the function execution context, optional +/// \return datum of the computed sum as a Scalar +/// +/// \since 6.0.0 +/// \note API not yet finalized +ARROW_EXPORT +Result Product( + const Datum& value, + const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(), + ExecContext* ctx = NULLPTR); + /// \brief Sum values of a numeric array. /// /// \param[in] value datum to sum, expecting Array or ChunkedArray diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 1ca620304cf..3b3d39fd36a 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -569,40 +569,54 @@ TEST(ExecPlanExecution, SourceScalarAggSink) { } TEST(ExecPlanExecution, ScalarSourceScalarAggSink) { + // ARROW-9056: scalar aggregation can be done over scalars, taking + // into account batch.length > 1 (e.g. a partition column) ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); AsyncGenerator> sink_gen; BatchesWithSchema scalar_data; scalar_data.batches = { - ExecBatchFromJSON({ValueDescr::Scalar(int32()), ValueDescr::Scalar(int32()), - ValueDescr::Scalar(int32())}, - "[[5, 5, 5], [5, 5, 5], [5, 5, 5]]"), - ExecBatchFromJSON({int32(), int32(), int32()}, - "[[5, 5, 5], [6, 6, 6], [7, 7, 7]]")}; - scalar_data.schema = - schema({field("a", int32()), field("b", int32()), field("c", int32())}); - - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{scalar_data.schema, - scalar_data.gen(/*parallel=*/false, - /*slow=*/false)}}, - {"aggregate", - AggregateNodeOptions{ - /*aggregates=*/{ - {"count", nullptr}, {"sum", nullptr}, {"mean", nullptr}}, - /*targets=*/{"a", "b", "c"}, - /*names=*/{"count(a)", "sum(b)", "mean(c)"}}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); + ExecBatchFromJSON({ValueDescr::Scalar(int32()), ValueDescr::Scalar(boolean())}, + "[[5, false], [5, false], [5, false]]"), + ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [6, false], [7, true]]")}; + scalar_data.schema = schema({field("a", int32()), field("b", boolean())}); + + // index can't be tested as it's order-dependent + // mode/quantile can't be tested as they're technically vector kernels + ASSERT_OK( + Declaration::Sequence( + { + {"source", + SourceNodeOptions{scalar_data.schema, scalar_data.gen(/*parallel=*/false, + /*slow=*/false)}}, + {"aggregate", AggregateNodeOptions{ + /*aggregates=*/{{"all", nullptr}, + {"any", nullptr}, + {"count", nullptr}, + {"mean", nullptr}, + {"product", nullptr}, + {"stddev", nullptr}, + {"sum", nullptr}, + {"tdigest", nullptr}, + {"variance", nullptr}}, + /*targets=*/{"b", "b", "a", "a", "a", "a", "a", "a", "a"}, + /*names=*/ + {"all(b)", "any(b)", "count(a)", "mean(a)", "product(a)", + "stddev(a)", "sum(a)", "tdigest(a)", "variance(a)"}}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); ASSERT_THAT( StartAndCollect(plan.get(), sink_gen), Finishes(ResultWith(UnorderedElementsAreArray({ - ExecBatchFromJSON({ValueDescr::Scalar(int64()), ValueDescr::Scalar(int64()), - ValueDescr::Scalar(float64())}, - "[[6, 33, 5.5]]"), + ExecBatchFromJSON( + {ValueDescr::Scalar(boolean()), ValueDescr::Scalar(boolean()), + ValueDescr::Scalar(int64()), ValueDescr::Scalar(float64()), + ValueDescr::Scalar(int64()), ValueDescr::Scalar(float64()), + ValueDescr::Scalar(int64()), ValueDescr::Array(float64()), + ValueDescr::Scalar(float64())}, + R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 0.5833333333333334]])"), })))); } diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc index a7df66695b2..bc92c203687 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc @@ -19,6 +19,7 @@ #include "arrow/compute/kernels/aggregate_basic_internal.h" #include "arrow/compute/kernels/aggregate_internal.h" #include "arrow/compute/kernels/common.h" +#include "arrow/compute/kernels/util_internal.h" #include "arrow/util/cpu_info.h" #include "arrow/util/make_unique.h" @@ -133,6 +134,109 @@ Result> MeanInit(KernelContext* ctx, return visitor.Create(); } +// ---------------------------------------------------------------------- +// Product implementation + +using arrow::compute::internal::to_unsigned; + +template +struct ProductImpl : public ScalarAggregator { + using ThisType = ProductImpl; + using AccType = typename FindAccumulatorType::Type; + using ProductType = typename TypeTraits::CType; + using OutputType = typename TypeTraits::ScalarType; + + explicit ProductImpl(const ScalarAggregateOptions& options) { this->options = options; } + + Status Consume(KernelContext*, const ExecBatch& batch) override { + if (batch[0].is_array()) { + const auto& data = batch[0].array(); + this->count += data->length - data->GetNullCount(); + VisitArrayDataInline( + *data, + [&](typename TypeTraits::CType value) { + this->product = + static_cast(to_unsigned(this->product) * to_unsigned(value)); + }, + [] {}); + } else { + const auto& data = *batch[0].scalar(); + this->count += data.is_valid * batch.length; + if (data.is_valid) { + for (int64_t i = 0; i < batch.length; i++) { + auto value = internal::UnboxScalar::Unbox(data); + this->product = + static_cast(to_unsigned(this->product) * to_unsigned(value)); + } + } + } + return Status::OK(); + } + + Status MergeFrom(KernelContext*, KernelState&& src) override { + const auto& other = checked_cast(src); + this->count += other.count; + this->product = + static_cast(to_unsigned(this->product) * to_unsigned(other.product)); + return Status::OK(); + } + + Status Finalize(KernelContext*, Datum* out) override { + if (this->count < options.min_count) { + out->value = std::make_shared(); + } else { + out->value = MakeScalar(this->product); + } + return Status::OK(); + } + + size_t count = 0; + typename AccType::c_type product = 1; + ScalarAggregateOptions options; +}; + +struct ProductInit { + std::unique_ptr state; + KernelContext* ctx; + const DataType& type; + const ScalarAggregateOptions& options; + + ProductInit(KernelContext* ctx, const DataType& type, + const ScalarAggregateOptions& options) + : ctx(ctx), type(type), options(options) {} + + Status Visit(const DataType&) { + return Status::NotImplemented("No product implemented"); + } + + Status Visit(const HalfFloatType&) { + return Status::NotImplemented("No product implemented"); + } + + Status Visit(const BooleanType&) { + state.reset(new ProductImpl(options)); + return Status::OK(); + } + + template + enable_if_number Visit(const Type&) { + state.reset(new ProductImpl(options)); + return Status::OK(); + } + + Result> Create() { + RETURN_NOT_OK(VisitTypeInline(type, this)); + return std::move(state); + } + + static Result> Init(KernelContext* ctx, + const KernelInitArgs& args) { + ProductInit visitor(ctx, *args.inputs[0].type, + static_cast(*args.options)); + return visitor.Create(); + } +}; + // ---------------------------------------------------------------------- // MinMax implementation @@ -290,9 +394,22 @@ struct IndexImpl : public ScalarAggregator { return Status::OK(); } + const ArgValue desired = internal::UnboxScalar::Unbox(*options.value); + + if (batch[0].is_scalar()) { + seen = batch.length; + if (batch[0].scalar()->is_valid) { + const ArgValue v = internal::UnboxScalar::Unbox(*batch[0].scalar()); + if (v == desired) { + index = 0; + return Status::Cancelled("Found"); + } + } + return Status::OK(); + } + auto input = batch[0].array(); seen = input->length; - const ArgValue desired = internal::UnboxScalar::Unbox(*options.value); int64_t i = 0; ARROW_UNUSED(internal::VisitArrayValuesInline( @@ -455,6 +572,14 @@ const FunctionDoc sum_doc{ {"array"}, "ScalarAggregateOptions"}; +const FunctionDoc product_doc{ + "Compute the product of values in a numeric array", + ("Null values are ignored by default. Minimum count of non-null\n" + "values can be set and null is returned if too few are present.\n" + "This can be changed through ScalarAggregateOptions."), + {"array"}, + "ScalarAggregateOptions"}; + const FunctionDoc mean_doc{ "Compute the mean of a numeric array", ("Null values are ignored by default. Minimum count of non-null\n" @@ -513,7 +638,7 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) { func = std::make_shared("sum", Arity::Unary(), &sum_doc, &default_scalar_aggregate_options); - aggregate::AddArrayScalarAggKernels(aggregate::SumInit, {boolean()}, int64(), + aggregate::AddArrayScalarAggKernels(aggregate::SumInit, {boolean()}, uint64(), func.get()); aggregate::AddArrayScalarAggKernels(aggregate::SumInit, SignedIntTypes(), int64(), func.get()); @@ -574,6 +699,18 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) { DCHECK_OK(registry->AddFunction(std::move(func))); + func = std::make_shared( + "product", Arity::Unary(), &product_doc, &default_scalar_aggregate_options); + aggregate::AddArrayScalarAggKernels(aggregate::ProductInit::Init, {boolean()}, uint64(), + func.get()); + aggregate::AddArrayScalarAggKernels(aggregate::ProductInit::Init, SignedIntTypes(), + int64(), func.get()); + aggregate::AddArrayScalarAggKernels(aggregate::ProductInit::Init, UnsignedIntTypes(), + uint64(), func.get()); + aggregate::AddArrayScalarAggKernels(aggregate::ProductInit::Init, FloatingPointTypes(), + float64(), func.get()); + DCHECK_OK(registry->AddFunction(std::move(func))); + // any func = std::make_shared("any", Arity::Unary(), &any_doc, &default_scalar_aggregate_options); diff --git a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc index 4c261604c85..fd0533edd4f 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc @@ -55,7 +55,9 @@ struct TDigestImpl : public ScalarAggregator { } else { const CType value = UnboxScalar::Unbox(*batch[0].scalar()); if (batch[0].scalar()->is_valid) { - this->tdigest.NanAdd(value); + for (int64_t i = 0; i < batch.length; i++) { + this->tdigest.NanAdd(value); + } } } return Status::OK(); diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index ec12b0913b8..db96bc3cad8 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -189,6 +189,53 @@ TEST(TestBooleanAggregation, Sum) { ResultWith(Datum(MakeNullScalar(uint64())))); } +TEST(TestBooleanAggregation, Product) { + const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(); + ValidateBooleanAgg("[]", std::make_shared(), options); + ValidateBooleanAgg("[null]", std::make_shared(), options); + ValidateBooleanAgg("[null, false]", std::make_shared(0), + options); + ValidateBooleanAgg("[true]", std::make_shared(1), options); + ValidateBooleanAgg("[true, false, true]", std::make_shared(0), + options); + ValidateBooleanAgg("[true, false, true, true, null]", + std::make_shared(0), options); + + const ScalarAggregateOptions& options_min_count_zero = + ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/0); + ValidateBooleanAgg("[]", std::make_shared(1), + options_min_count_zero); + ValidateBooleanAgg("[null]", std::make_shared(1), + options_min_count_zero); + + const char* json = "[true, null, true, null]"; + ValidateBooleanAgg( + json, std::make_shared(1), + ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/1)); + ValidateBooleanAgg( + json, std::make_shared(1), + ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/2)); + ValidateBooleanAgg( + json, std::make_shared(), + ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/3)); + ValidateBooleanAgg( + json, std::make_shared(1), + ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/1)); + ValidateBooleanAgg( + json, std::make_shared(1), + ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/2)); + ValidateBooleanAgg( + json, std::make_shared(), + ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/3)); + + EXPECT_THAT(Product(MakeScalar(true)), + ResultWith(Datum(std::make_shared(1)))); + EXPECT_THAT(Product(MakeScalar(false)), + ResultWith(Datum(std::make_shared(0)))); + EXPECT_THAT(Product(MakeNullScalar(boolean())), + ResultWith(Datum(MakeNullScalar(uint64())))); +} + TEST(TestBooleanAggregation, Mean) { const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(); ValidateBooleanAgg("[]", std::make_shared(), options); @@ -415,6 +462,105 @@ TEST_F(TestSumKernelRoundOff, Basics) { ASSERT_EQ(sum->value, 2756346749973250.0); } +// +// Product +// + +template +class TestNumericProductKernel : public ::testing::Test {}; + +TYPED_TEST_SUITE(TestNumericProductKernel, NumericArrowTypes); +TYPED_TEST(TestNumericProductKernel, SimpleProduct) { + using ProductType = typename FindAccumulatorType::Type; + using T = typename TypeParam::c_type; + using ProductT = typename ProductType::c_type; + + Datum null_result(std::make_shared::ScalarType>()); + + auto ty = TypeTraits::type_singleton(); + + EXPECT_THAT(Product(ArrayFromJSON(ty, "[]")), ResultWith(null_result)); + EXPECT_THAT(Product(ArrayFromJSON(ty, "[null]")), ResultWith(null_result)); + EXPECT_THAT(Product(ArrayFromJSON(ty, "[0, 1, 2, 3, 4, 5]")), + ResultWith(Datum(static_cast(0)))); + Datum chunks = ChunkedArrayFromJSON(ty, {"[1, 2, 3, 4, 5]"}); + EXPECT_THAT(Product(chunks), ResultWith(Datum(static_cast(120)))); + chunks = ChunkedArrayFromJSON(ty, {"[1, 2]", "[3, 4, 5]"}); + EXPECT_THAT(Product(chunks), ResultWith(Datum(static_cast(120)))); + chunks = ChunkedArrayFromJSON(ty, {"[1, 2]", "[]", "[3, 4, 5]"}); + EXPECT_THAT(Product(chunks), ResultWith(Datum(static_cast(120)))); + + const ScalarAggregateOptions& options = + ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/0); + + EXPECT_THAT(Product(ArrayFromJSON(ty, "[]"), options), Datum(static_cast(1))); + EXPECT_THAT(Product(ArrayFromJSON(ty, "[null]"), options), + Datum(static_cast(1))); + chunks = ChunkedArrayFromJSON(ty, {}); + EXPECT_THAT(Product(chunks, options), Datum(static_cast(1))); + + EXPECT_THAT(Product(ArrayFromJSON(ty, "[1, null, 3, null, 3, null, 7]"), options), + Datum(static_cast(63))); + + EXPECT_THAT(Product(Datum(static_cast(5))), + ResultWith(Datum(static_cast(5)))); + EXPECT_THAT(Product(MakeNullScalar(TypeTraits::type_singleton())), + ResultWith(null_result)); +} + +TYPED_TEST(TestNumericProductKernel, ScalarAggregateOptions) { + using ProductType = typename FindAccumulatorType::Type; + using T = typename TypeParam::c_type; + using ProductT = typename ProductType::c_type; + + Datum null_result(std::make_shared::ScalarType>()); + Datum one_result(static_cast(1)); + Datum result(static_cast(63)); + + auto ty = TypeTraits::type_singleton(); + Datum empty = ArrayFromJSON(ty, "[]"); + Datum null = ArrayFromJSON(ty, "[null]"); + Datum arr = ArrayFromJSON(ty, "[1, null, 3, null, 3, null, 7]"); + + EXPECT_THAT( + Product(empty, ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/0)), + ResultWith(one_result)); + EXPECT_THAT(Product(null, ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/0)), + ResultWith(one_result)); + EXPECT_THAT(Product(arr, ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/3)), + ResultWith(result)); + EXPECT_THAT(Product(arr, ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/4)), + ResultWith(result)); + EXPECT_THAT(Product(arr, ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/5)), + ResultWith(null_result)); + EXPECT_THAT( + Product(empty, ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/1)), + ResultWith(null_result)); + EXPECT_THAT(Product(null, ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/1)), + ResultWith(null_result)); + EXPECT_THAT(Product(arr, ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/3)), + ResultWith(result)); + EXPECT_THAT(Product(arr, ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/4)), + ResultWith(result)); + EXPECT_THAT(Product(arr, ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/5)), + ResultWith(null_result)); + + EXPECT_THAT( + Product(Datum(static_cast(5)), ScalarAggregateOptions(/*skip_nulls=*/false)), + ResultWith(Datum(static_cast(5)))); + EXPECT_THAT(Product(Datum(static_cast(5)), + ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/2)), + ResultWith(null_result)); + EXPECT_THAT(Product(MakeNullScalar(TypeTraits::type_singleton()), + ScalarAggregateOptions(/*skip_nulls=*/false)), + ResultWith(null_result)); +} + +TEST(TestProductKernel, Overflow) { + EXPECT_THAT(Product(ArrayFromJSON(int64(), "[8589934592, 8589934593]")), + ResultWith(Datum(static_cast(8589934592)))); +} + // // Count // diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc index 6fa49d03d76..4fcea3e8e3a 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc @@ -107,6 +107,18 @@ struct VarStdState { } } + // Scalar: textbook algorithm + void Consume(const Scalar& scalar, const int64_t count) { + this->m2 = 0; + if (scalar.is_valid) { + this->count = count; + this->mean = static_cast(UnboxScalar::Unbox(scalar)); + } else { + this->count = 0; + this->mean = 0; + } + } + // Combine `m2` from two chunks (m2 = n*s2) // https://www.emathzone.com/tutorials/basic-statistics/combined-variance.html void MergeFrom(const ThisType& state) { @@ -138,8 +150,12 @@ struct VarStdImpl : public ScalarAggregator { : out_type(out_type), options(options), return_type(return_type) {} Status Consume(KernelContext*, const ExecBatch& batch) override { - ArrayType array(batch[0].array()); - this->state.Consume(array); + if (batch[0].is_array()) { + ArrayType array(batch[0].array()); + this->state.Consume(array); + } else { + this->state.Consume(*batch[0].scalar(), batch.length); + } return Status::OK(); } @@ -166,34 +182,6 @@ struct VarStdImpl : public ScalarAggregator { VarOrStd return_type; }; -struct ScalarVarStdImpl : public ScalarAggregator { - explicit ScalarVarStdImpl(const VarianceOptions& options) - : options(options), seen(false) {} - - Status Consume(KernelContext*, const ExecBatch& batch) override { - seen = batch[0].scalar()->is_valid; - return Status::OK(); - } - - Status MergeFrom(KernelContext*, KernelState&& src) override { - const auto& other = checked_cast(src); - seen = seen || other.seen; - return Status::OK(); - } - - Status Finalize(KernelContext*, Datum* out) override { - if (!seen || options.ddof > 0) { - out->value = std::make_shared(); - } else { - out->value = std::make_shared(0.0); - } - return Status::OK(); - } - - const VarianceOptions options; - bool seen; -}; - struct VarStdInitState { std::unique_ptr state; KernelContext* ctx; @@ -247,21 +235,12 @@ Result> VarianceInit(KernelContext* ctx, return visitor.Create(); } -Result> ScalarVarStdInit(KernelContext* ctx, - const KernelInitArgs& args) { - return arrow::internal::make_unique( - static_cast(*args.options)); -} - void AddVarStdKernels(KernelInit init, const std::vector>& types, ScalarAggregateFunction* func) { for (const auto& ty : types) { - auto sig = KernelSignature::Make({InputType::Array(ty)}, float64()); + auto sig = KernelSignature::Make({InputType(ty)}, float64()); AddAggKernel(std::move(sig), init, func); - - sig = KernelSignature::Make({InputType::Scalar(ty)}, float64()); - AddAggKernel(std::move(sig), ScalarVarStdInit, func); } } diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index ba5c90f15de..b3d602a89ac 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -37,6 +37,7 @@ #include "arrow/compute/kernels/aggregate_internal.h" #include "arrow/compute/kernels/aggregate_var_std_internal.h" #include "arrow/compute/kernels/common.h" +#include "arrow/compute/kernels/util_internal.h" #include "arrow/util/bit_run_reader.h" #include "arrow/util/bitmap_ops.h" #include "arrow/util/bitmap_writer.h" @@ -901,8 +902,9 @@ struct GroupedSumImpl : public GroupedAggregator { using AccType = typename FindAccumulatorType::Type; using SumType = typename TypeTraits::CType; - Status Init(ExecContext* ctx, const FunctionOptions*) override { + Status Init(ExecContext* ctx, const FunctionOptions* options) override { pool_ = ctx->memory_pool(); + options_ = checked_cast(*options); sums_ = BufferBuilder(pool_); counts_ = BufferBuilder(pool_); out_type_ = TypeTraits::type_singleton(); @@ -955,10 +957,11 @@ struct GroupedSumImpl : public GroupedAggregator { Result Finalize() override { std::shared_ptr null_bitmap; + const int64_t* counts = reinterpret_cast(counts_.data()); int64_t null_count = 0; for (int64_t i = 0; i < num_groups_; ++i) { - if (reinterpret_cast(counts_.data())[i] > 0) continue; + if (counts[i] >= options_.min_count) continue; if (null_bitmap == nullptr) { ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_)); @@ -980,6 +983,7 @@ struct GroupedSumImpl : public GroupedAggregator { // NB: counts are used here instead of a simple "has_values_" bitmap since // we expect to reuse this kernel to handle Mean int64_t num_groups_ = 0; + ScalarAggregateOptions options_; BufferBuilder sums_, counts_; std::shared_ptr out_type_; MemoryPool* pool_; @@ -1011,6 +1015,125 @@ struct GroupedSumFactory { InputType argument_type; }; +// ---------------------------------------------------------------------- +// Product implementation + +template +struct GroupedProductImpl final : public GroupedAggregator { + using AccType = typename FindAccumulatorType::Type; + using ProductType = typename TypeTraits::CType; + + Status Init(ExecContext* ctx, const FunctionOptions* options) override { + pool_ = ctx->memory_pool(); + options_ = checked_cast(*options); + products_ = TypedBufferBuilder(pool_); + counts_ = TypedBufferBuilder(pool_); + out_type_ = TypeTraits::type_singleton(); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + auto added_groups = new_num_groups - num_groups_; + num_groups_ = new_num_groups; + RETURN_NOT_OK(products_.Append(added_groups * sizeof(AccType), 1)); + RETURN_NOT_OK(counts_.Append(added_groups, 0)); + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + ProductType* products = products_.mutable_data(); + int64_t* counts = counts_.mutable_data(); + auto g = batch[1].array()->GetValues(1); + VisitArrayDataInline( + *batch[0].array(), + [&](typename TypeTraits::CType value) { + products[*g] = static_cast( + to_unsigned(products[*g]) * to_unsigned(static_cast(value))); + counts[*g++] += 1; + }, + [&] { ++g; }); + return Status::OK(); + } + + Status Merge(GroupedAggregator&& raw_other, + const ArrayData& group_id_mapping) override { + auto other = checked_cast(&raw_other); + + int64_t* counts = counts_.mutable_data(); + ProductType* products = products_.mutable_data(); + + const int64_t* other_counts = other->counts_.mutable_data(); + const ProductType* other_products = other->products_.mutable_data(); + const uint32_t* g = group_id_mapping.GetValues(1); + + for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) { + products[*g] = static_cast(to_unsigned(products[*g]) * + to_unsigned(other_products[other_g])); + counts[*g] += other_counts[other_g]; + } + return Status::OK(); + } + + Result Finalize() override { + ARROW_ASSIGN_OR_RAISE(auto products, products_.Finish()); + const int64_t* counts = counts_.data(); + + std::shared_ptr null_bitmap; + int64_t null_count = 0; + + for (int64_t i = 0; i < num_groups_; ++i) { + if (counts[i] >= options_.min_count) continue; + + if (null_bitmap == nullptr) { + ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_)); + BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true); + } + + null_count += 1; + BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false); + } + + return ArrayData::Make(std::move(out_type_), num_groups_, + {std::move(null_bitmap), std::move(products)}, null_count); + } + + std::shared_ptr out_type() const override { return out_type_; } + + int64_t num_groups_ = 0; + ScalarAggregateOptions options_; + TypedBufferBuilder products_; + TypedBufferBuilder counts_; + std::shared_ptr out_type_; + MemoryPool* pool_; +}; + +struct GroupedProductFactory { + template ::Type> + Status Visit(const T&) { + kernel = + MakeKernel(std::move(argument_type), HashAggregateInit>); + return Status::OK(); + } + + Status Visit(const HalfFloatType& type) { + return Status::NotImplemented("Taking product of data of type ", type); + } + + Status Visit(const DataType& type) { + return Status::NotImplemented("Taking product of data of type ", type); + } + + static Result Make(const std::shared_ptr& type) { + GroupedProductFactory factory; + factory.argument_type = InputType::Array(type); + RETURN_NOT_OK(VisitTypeInline(*type, &factory)); + return std::move(factory.kernel); + } + + HashAggregateKernel kernel; + InputType argument_type; +}; + // ---------------------------------------------------------------------- // Mean implementation @@ -1027,7 +1150,7 @@ struct GroupedMeanImpl : public GroupedSumImpl { const auto* sums = reinterpret_cast(sums_.data()); double* means = reinterpret_cast(values->mutable_data()); for (int64_t i = 0; i < num_groups_; ++i) { - if (counts[i] > 0) { + if (counts[i] >= options_.min_count) { means[i] = static_cast(sums[i]) / counts[i]; continue; } @@ -1049,6 +1172,7 @@ struct GroupedMeanImpl : public GroupedSumImpl { std::shared_ptr out_type() const override { return float64(); } using GroupedSumImpl::num_groups_; + using GroupedSumImpl::options_; using GroupedSumImpl::pool_; using GroupedSumImpl::counts_; using GroupedSumImpl::sums_; @@ -1964,6 +2088,12 @@ const FunctionDoc hash_sum_doc{"Sum values of a numeric array", ("Null values are ignored."), {"array", "group_id_array"}}; +const FunctionDoc hash_product_doc{ + "Compute product of values of a numeric array", + ("Null values are ignored.\n" + "Overflow will wrap around as if the calculation was done with unsigned integers."), + {"array", "group_id_array"}}; + const FunctionDoc hash_mean_doc{"Average values of a numeric array", ("Null values are ignored."), {"array", "group_id_array"}}; @@ -2008,8 +2138,8 @@ const FunctionDoc hash_all_doc{"Test whether all elements evaluate to true", } // namespace void RegisterHashAggregateBasic(FunctionRegistry* registry) { + static auto default_scalar_aggregate_options = ScalarAggregateOptions::Defaults(); { - static auto default_scalar_aggregate_options = ScalarAggregateOptions::Defaults(); auto func = std::make_shared( "hash_count", Arity::Binary(), &hash_count_doc, &default_scalar_aggregate_options); @@ -2020,8 +2150,8 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) { } { - auto func = std::make_shared("hash_sum", Arity::Binary(), - &hash_sum_doc); + auto func = std::make_shared( + "hash_sum", Arity::Binary(), &hash_sum_doc, &default_scalar_aggregate_options); DCHECK_OK(AddHashAggKernels({boolean()}, GroupedSumFactory::Make, func.get())); DCHECK_OK(AddHashAggKernels(SignedIntTypes(), GroupedSumFactory::Make, func.get())); DCHECK_OK(AddHashAggKernels(UnsignedIntTypes(), GroupedSumFactory::Make, func.get())); @@ -2031,8 +2161,22 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) { } { - auto func = std::make_shared("hash_mean", Arity::Binary(), - &hash_mean_doc); + auto func = std::make_shared( + "hash_product", Arity::Binary(), &hash_product_doc, + &default_scalar_aggregate_options); + DCHECK_OK(AddHashAggKernels({boolean()}, GroupedProductFactory::Make, func.get())); + DCHECK_OK( + AddHashAggKernels(SignedIntTypes(), GroupedProductFactory::Make, func.get())); + DCHECK_OK( + AddHashAggKernels(UnsignedIntTypes(), GroupedProductFactory::Make, func.get())); + DCHECK_OK( + AddHashAggKernels(FloatingPointTypes(), GroupedProductFactory::Make, func.get())); + DCHECK_OK(registry->AddFunction(std::move(func))); + } + + { + auto func = std::make_shared( + "hash_mean", Arity::Binary(), &hash_mean_doc, &default_scalar_aggregate_options); DCHECK_OK(AddHashAggKernels({boolean()}, GroupedMeanFactory::Make, func.get())); DCHECK_OK(AddHashAggKernels(SignedIntTypes(), GroupedMeanFactory::Make, func.get())); DCHECK_OK( @@ -2081,7 +2225,6 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) { } { - static auto default_scalar_aggregate_options = ScalarAggregateOptions::Defaults(); auto func = std::make_shared( "hash_min_max", Arity::Binary(), &hash_min_max_doc, &default_scalar_aggregate_options); diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc index b0114eac55c..cb9d05e0d35 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -767,24 +767,28 @@ TEST(GroupBy, MeanOnly) { [null, 3] ])"}); + ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3); ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, - internal::GroupBy({table->GetColumnByName("argument")}, + internal::GroupBy({table->GetColumnByName("argument"), + table->GetColumnByName("argument")}, {table->GetColumnByName("key")}, { {"hash_mean", nullptr}, + {"hash_mean", &min_count}, }, use_threads)); SortBy({"key_0"}, &aggregated_and_grouped); AssertDatumsApproxEqual(ArrayFromJSON(struct_({ + field("hash_mean", float64()), field("hash_mean", float64()), field("key_0", int64()), }), R"([ - [2.125, 1], - [-0.041666666666666664, 2], - [null, 3], - [2.375, null] + [2.125, null, 1], + [-0.041666666666666664, -0.041666666666666664, 2], + [null, null, 3], + [2.375, null, null] ])"), aggregated_and_grouped, /*verbose=*/true); @@ -1074,6 +1078,7 @@ TEST(GroupBy, CountAndSum) { ])"); ScalarAggregateOptions count_options; + ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3); ASSERT_OK_AND_ASSIGN( Datum aggregated_and_grouped, internal::GroupBy( @@ -1081,6 +1086,7 @@ TEST(GroupBy, CountAndSum) { // NB: passing an argument twice or also using it as a key is legal batch->GetColumnByName("argument"), batch->GetColumnByName("argument"), + batch->GetColumnByName("argument"), batch->GetColumnByName("key"), }, { @@ -1089,6 +1095,7 @@ TEST(GroupBy, CountAndSum) { { {"hash_count", &count_options}, {"hash_sum", nullptr}, + {"hash_sum", &min_count}, {"hash_sum", nullptr}, })); @@ -1097,19 +1104,94 @@ TEST(GroupBy, CountAndSum) { field("hash_count", int64()), // NB: summing a float32 array results in float64 sums field("hash_sum", float64()), + field("hash_sum", float64()), field("hash_sum", int64()), field("key_0", int64()), }), R"([ - [2, 4.25, 3, 1], - [3, -0.125, 6, 2], - [0, null, 6, 3], - [2, 4.75, null, null] + [2, 4.25, null, 3, 1], + [3, -0.125, -0.125, 6, 2], + [0, null, null, 6, 3], + [2, 4.75, null, null, null] ])"), aggregated_and_grouped, /*verbose=*/true); } +TEST(GroupBy, Product) { + auto batch = RecordBatchFromJSON( + schema({field("argument", float64()), field("key", int64())}), R"([ + [-1.0, 1], + [null, 1], + [0.0, 2], + [null, 3], + [4.0, null], + [3.25, 1], + [0.125, 2], + [-0.25, 2], + [0.75, null], + [null, 3] + ])"); + + ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3); + ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, + internal::GroupBy( + { + batch->GetColumnByName("argument"), + batch->GetColumnByName("key"), + batch->GetColumnByName("argument"), + }, + { + batch->GetColumnByName("key"), + }, + { + {"hash_product", nullptr}, + {"hash_product", nullptr}, + {"hash_product", &min_count}, + })); + + AssertDatumsApproxEqual(ArrayFromJSON(struct_({ + field("hash_product", float64()), + field("hash_product", int64()), + field("hash_product", float64()), + field("key_0", int64()), + }), + R"([ + [-3.25, 1, null, 1], + [0.0, 8, 0.0, 2], + [null, 9, null, 3], + [3.0, null, null, null] + ])"), + aggregated_and_grouped, + /*verbose=*/true); + + // Overflow should wrap around + batch = RecordBatchFromJSON(schema({field("argument", int64()), field("key", int64())}), + R"([ + [8589934592, 1], + [8589934593, 1] + ])"); + + ASSERT_OK_AND_ASSIGN(aggregated_and_grouped, internal::GroupBy( + { + batch->GetColumnByName("argument"), + }, + { + batch->GetColumnByName("key"), + }, + { + {"hash_product", nullptr}, + })); + + AssertDatumsApproxEqual(ArrayFromJSON(struct_({ + field("hash_product", int64()), + field("key_0", int64()), + }), + R"([[8589934592, 1]])"), + aggregated_and_grouped, + /*verbose=*/true); +} + TEST(GroupBy, SumOnlyStringAndDictKeys) { for (auto key_type : {utf8(), dictionary(int32(), utf8())}) { SCOPED_TRACE("key type: " + key_type->ToString()); @@ -1205,6 +1287,7 @@ TEST(GroupBy, CountNull) { } TEST(GroupBy, RandomArraySum) { + ScalarAggregateOptions options(/*skip_nulls=*/true, /*min_count=*/0); for (int64_t length : {1 << 10, 1 << 12, 1 << 15}) { for (auto null_probability : {0.0, 0.01, 0.5, 1.0}) { auto batch = random::GenerateBatch( @@ -1218,7 +1301,7 @@ TEST(GroupBy, RandomArraySum) { ValidateGroupBy( { - {"hash_sum", nullptr}, + {"hash_sum", &options}, }, {batch->GetColumnByName("argument")}, {batch->GetColumnByName("key")}); } diff --git a/cpp/src/arrow/compute/kernels/scalar_arithmetic.cc b/cpp/src/arrow/compute/kernels/scalar_arithmetic.cc index a5d4a557740..bca363022bd 100644 --- a/cpp/src/arrow/compute/kernels/scalar_arithmetic.cc +++ b/cpp/src/arrow/compute/kernels/scalar_arithmetic.cc @@ -47,6 +47,9 @@ using applicator::ScalarUnaryNotNull; namespace { +// N.B. take care not to conflict with type_traits.h as that can cause surprises in a +// unity build + template using is_unsigned_integer = std::integral_constant::value && std::is_unsigned::value>; @@ -56,28 +59,23 @@ using is_signed_integer = std::integral_constant::value && std::is_signed::value>; template -using enable_if_signed_integer = enable_if_t::value, R>; +using enable_if_signed_c_integer = enable_if_t::value, R>; template -using enable_if_unsigned_integer = enable_if_t::value, R>; +using enable_if_unsigned_c_integer = enable_if_t::value, R>; template -using enable_if_integer = +using enable_if_c_integer = enable_if_t::value || is_unsigned_integer::value, R>; template using enable_if_floating_point = enable_if_t::value, R>; template -using enable_if_decimal = +using enable_if_decimal_value = enable_if_t::value || std::is_same::value, T>; -template ::type> -constexpr Unsigned to_unsigned(T signed_) { - return static_cast(signed_); -} - struct AbsoluteValue { template static constexpr enable_if_floating_point Call(KernelContext*, T arg, Status*) { @@ -85,19 +83,19 @@ struct AbsoluteValue { } template - static constexpr enable_if_unsigned_integer Call(KernelContext*, T arg, Status*) { + static constexpr enable_if_unsigned_c_integer Call(KernelContext*, T arg, Status*) { return arg; } template - static constexpr enable_if_signed_integer Call(KernelContext*, T arg, Status* st) { + static constexpr enable_if_signed_c_integer Call(KernelContext*, T arg, Status* st) { return (arg < 0) ? arrow::internal::SafeSignedNegate(arg) : arg; } }; struct AbsoluteValueChecked { template - static enable_if_signed_integer Call(KernelContext*, Arg arg, Status* st) { + static enable_if_signed_c_integer Call(KernelContext*, Arg arg, Status* st) { static_assert(std::is_same::value, ""); if (arg == std::numeric_limits::min()) { *st = Status::Invalid("overflow"); @@ -107,7 +105,7 @@ struct AbsoluteValueChecked { } template - static enable_if_unsigned_integer Call(KernelContext* ctx, Arg arg, Status* st) { + static enable_if_unsigned_c_integer Call(KernelContext* ctx, Arg arg, Status* st) { static_assert(std::is_same::value, ""); return arg; } @@ -127,26 +125,26 @@ struct Add { } template - static constexpr enable_if_unsigned_integer Call(KernelContext*, Arg0 left, - Arg1 right, Status*) { + static constexpr enable_if_unsigned_c_integer Call(KernelContext*, Arg0 left, + Arg1 right, Status*) { return left + right; } template - static constexpr enable_if_signed_integer Call(KernelContext*, Arg0 left, Arg1 right, - Status*) { + static constexpr enable_if_signed_c_integer Call(KernelContext*, Arg0 left, + Arg1 right, Status*) { return arrow::internal::SafeSignedAdd(left, right); } template - static enable_if_decimal Call(KernelContext*, Arg0 left, Arg1 right, Status*) { + static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) { return left + right; } }; struct AddChecked { template - static enable_if_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) { + static enable_if_c_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) { static_assert(std::is_same::value && std::is_same::value, ""); T result = 0; if (ARROW_PREDICT_FALSE(AddWithOverflow(left, right, &result))) { @@ -163,7 +161,7 @@ struct AddChecked { } template - static enable_if_decimal Call(KernelContext*, Arg0 left, Arg1 right, Status*) { + static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) { return left + right; } }; @@ -177,28 +175,28 @@ struct Subtract { } template - static constexpr enable_if_unsigned_integer Call(KernelContext*, Arg0 left, - Arg1 right, Status*) { + static constexpr enable_if_unsigned_c_integer Call(KernelContext*, Arg0 left, + Arg1 right, Status*) { static_assert(std::is_same::value && std::is_same::value, ""); return left - right; } template - static constexpr enable_if_signed_integer Call(KernelContext*, Arg0 left, Arg1 right, - Status*) { + static constexpr enable_if_signed_c_integer Call(KernelContext*, Arg0 left, + Arg1 right, Status*) { static_assert(std::is_same::value && std::is_same::value, ""); return arrow::internal::SafeSignedSubtract(left, right); } template - static enable_if_decimal Call(KernelContext*, Arg0 left, Arg1 right, Status*) { + static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) { return left + (-right); } }; struct SubtractChecked { template - static enable_if_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) { + static enable_if_c_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) { static_assert(std::is_same::value && std::is_same::value, ""); T result = 0; if (ARROW_PREDICT_FALSE(SubtractWithOverflow(left, right, &result))) { @@ -215,7 +213,7 @@ struct SubtractChecked { } template - static enable_if_decimal Call(KernelContext*, Arg0 left, Arg1 right, Status*) { + static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) { return left + (-right); } }; @@ -266,14 +264,14 @@ struct Multiply { } template - static enable_if_decimal Call(KernelContext*, Arg0 left, Arg1 right, Status*) { + static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) { return left * right; } }; struct MultiplyChecked { template - static enable_if_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) { + static enable_if_c_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) { static_assert(std::is_same::value && std::is_same::value, ""); T result = 0; if (ARROW_PREDICT_FALSE(MultiplyWithOverflow(left, right, &result))) { @@ -290,7 +288,7 @@ struct MultiplyChecked { } template - static enable_if_decimal Call(KernelContext*, Arg0 left, Arg1 right, Status*) { + static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) { return left * right; } }; @@ -303,7 +301,7 @@ struct Divide { } template - static enable_if_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) { + static enable_if_c_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) { T result; if (ARROW_PREDICT_FALSE(DivideWithOverflow(left, right, &result))) { if (right == 0) { @@ -316,7 +314,8 @@ struct Divide { } template - static enable_if_decimal Call(KernelContext*, Arg0 left, Arg1 right, Status* st) { + static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, + Status* st) { if (right == Arg1()) { *st = Status::Invalid("Divide by zero"); return T(); @@ -328,7 +327,7 @@ struct Divide { struct DivideChecked { template - static enable_if_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) { + static enable_if_c_integer Call(KernelContext*, Arg0 left, Arg1 right, Status* st) { static_assert(std::is_same::value && std::is_same::value, ""); T result; if (ARROW_PREDICT_FALSE(DivideWithOverflow(left, right, &result))) { @@ -353,8 +352,8 @@ struct DivideChecked { } template - static enable_if_decimal Call(KernelContext* ctx, Arg0 left, Arg1 right, - Status* st) { + static enable_if_decimal_value Call(KernelContext* ctx, Arg0 left, Arg1 right, + Status* st) { return Divide::Call(ctx, left, right, st); } }; @@ -366,19 +365,20 @@ struct Negate { } template - static constexpr enable_if_unsigned_integer Call(KernelContext*, Arg arg, Status*) { + static constexpr enable_if_unsigned_c_integer Call(KernelContext*, Arg arg, + Status*) { return ~arg + 1; } template - static constexpr enable_if_signed_integer Call(KernelContext*, Arg arg, Status*) { + static constexpr enable_if_signed_c_integer Call(KernelContext*, Arg arg, Status*) { return arrow::internal::SafeSignedNegate(arg); } }; struct NegateChecked { template - static enable_if_signed_integer Call(KernelContext*, Arg arg, Status* st) { + static enable_if_signed_c_integer Call(KernelContext*, Arg arg, Status* st) { static_assert(std::is_same::value, ""); T result = 0; if (ARROW_PREDICT_FALSE(NegateWithOverflow(arg, &result))) { @@ -388,7 +388,7 @@ struct NegateChecked { } template - static enable_if_unsigned_integer Call(KernelContext* ctx, Arg arg, Status* st) { + static enable_if_unsigned_c_integer Call(KernelContext* ctx, Arg arg, Status* st) { static_assert(std::is_same::value, ""); DCHECK(false) << "This is included only for the purposes of instantiability from the " "arithmetic kernel generator"; @@ -416,7 +416,7 @@ struct Power { } template - static enable_if_integer Call(KernelContext*, T base, T exp, Status* st) { + static enable_if_c_integer Call(KernelContext*, T base, T exp, Status* st) { if (exp < 0) { *st = Status::Invalid("integers to negative integer powers are not allowed"); return 0; @@ -432,7 +432,7 @@ struct Power { struct PowerChecked { template - static enable_if_integer Call(KernelContext*, Arg0 base, Arg1 exp, Status* st) { + static enable_if_c_integer Call(KernelContext*, Arg0 base, Arg1 exp, Status* st) { if (exp < 0) { *st = Status::Invalid("integers to negative integer powers are not allowed"); return 0; @@ -471,12 +471,13 @@ struct Sign { } template - static constexpr enable_if_unsigned_integer Call(KernelContext*, Arg arg, Status*) { + static constexpr enable_if_unsigned_c_integer Call(KernelContext*, Arg arg, + Status*) { return arg > 0; } template - static constexpr enable_if_signed_integer Call(KernelContext*, Arg arg, Status*) { + static constexpr enable_if_signed_c_integer Call(KernelContext*, Arg arg, Status*) { return (arg > 0) ? 1 : ((arg == 0) ? 0 : -1); } }; @@ -526,8 +527,8 @@ struct ShiftLeft { // See SEI CERT C Coding Standard rule INT34-C struct ShiftLeftChecked { template - static enable_if_unsigned_integer Call(KernelContext*, Arg0 lhs, Arg1 rhs, - Status* st) { + static enable_if_unsigned_c_integer Call(KernelContext*, Arg0 lhs, Arg1 rhs, + Status* st) { static_assert(std::is_same::value, ""); if (ARROW_PREDICT_FALSE(rhs < 0 || rhs >= std::numeric_limits::digits)) { *st = Status::Invalid("shift amount must be >= 0 and less than precision of type"); @@ -537,8 +538,8 @@ struct ShiftLeftChecked { } template - static enable_if_signed_integer Call(KernelContext*, Arg0 lhs, Arg1 rhs, - Status* st) { + static enable_if_signed_c_integer Call(KernelContext*, Arg0 lhs, Arg1 rhs, + Status* st) { using Unsigned = typename std::make_unsigned::type; static_assert(std::is_same::value, ""); if (ARROW_PREDICT_FALSE(rhs < 0 || rhs >= std::numeric_limits::digits)) { diff --git a/cpp/src/arrow/compute/kernels/util_internal.h b/cpp/src/arrow/compute/kernels/util_internal.h index 394e08da581..8d21e34b3bf 100644 --- a/cpp/src/arrow/compute/kernels/util_internal.h +++ b/cpp/src/arrow/compute/kernels/util_internal.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include "arrow/array/util.h" @@ -42,6 +43,16 @@ namespace internal { #define M_PI_4 0.785398163397448309616 #endif +template +using maybe_make_unsigned = + typename std::conditional::value && !std::is_same::value, + std::make_unsigned, std::common_type >::type; + +template ::type> +constexpr Unsigned to_unsigned(T signed_) { + return static_cast(signed_); +} + // An internal data structure for unpacking a primitive argument to pass to a // kernel implementation struct PrimitiveArg { diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst index 0540f806522..58321457ab0 100644 --- a/docs/source/cpp/compute.rst +++ b/docs/source/cpp/compute.rst @@ -200,11 +200,13 @@ Aggregations +---------------+-------+-------------+----------------+----------------------------------+-------+ | mode | Unary | Numeric | Struct | :struct:`ModeOptions` | \(3) | +---------------+-------+-------------+----------------+----------------------------------+-------+ -| quantile | Unary | Numeric | Scalar Numeric | :struct:`QuantileOptions` | \(4) | +| product | Unary | Numeric | Scalar Numeric | :struct:`ScalarAggregateOptions` | \(4) | ++---------------+-------+-------------+----------------+----------------------------------+-------+ +| quantile | Unary | Numeric | Scalar Numeric | :struct:`QuantileOptions` | \(5) | +---------------+-------+-------------+----------------+----------------------------------+-------+ | stddev | Unary | Numeric | Scalar Float64 | :struct:`VarianceOptions` | | +---------------+-------+-------------+----------------+----------------------------------+-------+ -| sum | Unary | Numeric | Scalar Numeric | :struct:`ScalarAggregateOptions` | \(5) | +| sum | Unary | Numeric | Scalar Numeric | :struct:`ScalarAggregateOptions` | \(4) | +---------------+-------+-------------+----------------+----------------------------------+-------+ | tdigest | Unary | Numeric | Scalar Float64 | :struct:`TDigestOptions` | | +---------------+-------+-------------+----------------+----------------------------------+-------+ @@ -225,9 +227,9 @@ Notes: Note that the output can have less than *N* elements if the input has less than *N* distinct values. -* \(4) Output is Float64 or input type, depending on QuantileOptions. +* \(4) Output is Int64, UInt64 or Float64, depending on the input type. -* \(5) Output is Int64, UInt64 or Float64, depending on the input type. +* \(5) Output is Float64 or input type, depending on QuantileOptions. Element-wise ("scalar") functions --------------------------------- diff --git a/docs/source/python/api/compute.rst b/docs/source/python/api/compute.rst index 08d6e9da051..c5f16d144ef 100644 --- a/docs/source/python/api/compute.rst +++ b/docs/source/python/api/compute.rst @@ -32,6 +32,7 @@ Aggregations mean min_max mode + product stddev sum variance