diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 4fd6af9b190..9222c5dd18f 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -1718,11 +1718,11 @@ struct GroupedMinMaxFactory { } Status Visit(const HalfFloatType& type) { - return Status::NotImplemented("Summing data of type ", type); + return Status::NotImplemented("Computing min/max of data of type ", type); } Status Visit(const DataType& type) { - return Status::NotImplemented("Summing data of type ", type); + return Status::NotImplemented("Computing min/max of data of type ", type); } static Result Make(const std::shared_ptr& type) { @@ -1740,15 +1740,18 @@ struct GroupedMinMaxFactory { // Any/All implementation struct GroupedAnyImpl : public GroupedAggregator { - Status Init(ExecContext* ctx, const FunctionOptions*) override { + Status Init(ExecContext* ctx, const FunctionOptions* options) override { + options_ = *checked_cast(options); seen_ = TypedBufferBuilder(ctx->memory_pool()); + has_nulls_ = TypedBufferBuilder(ctx->memory_pool()); return Status::OK(); } Status Resize(int64_t new_num_groups) override { auto added_groups = new_num_groups - num_groups_; num_groups_ = new_num_groups; - return seen_.Append(added_groups, false); + RETURN_NOT_OK(seen_.Append(added_groups, false)); + return has_nulls_.Append(added_groups, false); } Status Merge(GroupedAggregator&& raw_other, @@ -1757,29 +1760,51 @@ struct GroupedAnyImpl : public GroupedAggregator { auto seen = seen_.mutable_data(); auto other_seen = other->seen_.data(); + auto has_nulls = has_nulls_.mutable_data(); + auto other_has_nulls = other->has_nulls_.data(); auto g = group_id_mapping.GetValues(1); for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) { if (BitUtil::GetBit(other_seen, other_g)) BitUtil::SetBitTo(seen, *g, true); + if (BitUtil::GetBit(other_has_nulls, other_g)) { + BitUtil::SetBitTo(has_nulls, *g, true); + } } return Status::OK(); } Status Consume(const ExecBatch& batch) override { auto seen = seen_.mutable_data(); + auto has_nulls = has_nulls_.mutable_data(); const auto& input = *batch[0].array(); auto g = batch[1].array()->GetValues(1); - arrow::internal::VisitTwoBitBlocksVoid( - input.buffers[0], input.offset, input.buffers[1], input.offset, input.length, - [&](int64_t) { BitUtil::SetBitTo(seen, *g++, true); }, [&]() { g++; }); + auto values = input.buffers[1]->data(); + arrow::internal::VisitBitBlocksVoid( + input.buffers[0], input.offset, input.length, + [&](int64_t offset) { + BitUtil::SetBitTo(seen, *g, + BitUtil::GetBit(seen, *g) || + BitUtil::GetBit(values, input.offset + offset)); + g++; + }, + [&]() { BitUtil::SetBitTo(has_nulls, *g++, true); }); return Status::OK(); } Result Finalize() override { ARROW_ASSIGN_OR_RAISE(auto seen, seen_.Finish()); - return std::make_shared(num_groups_, std::move(seen)); + if (options_.skip_nulls) { + return std::make_shared(num_groups_, std::move(seen)); + } + ARROW_ASSIGN_OR_RAISE(auto bitmap, has_nulls_.Finish()); + // null if (~seen & has_nulls) -> not null if (seen | ~has_nulls) + ::arrow::internal::BitmapOrNot(seen->data(), /*left_offset=*/0, bitmap->data(), + /*right_offset=*/0, num_groups_, /*out_offset=*/0, + bitmap->mutable_data()); + return std::make_shared(num_groups_, std::move(seen), + std::move(bitmap)); } std::shared_ptr out_type() const override { return boolean(); } @@ -1787,18 +1812,22 @@ struct GroupedAnyImpl : public GroupedAggregator { int64_t num_groups_ = 0; ScalarAggregateOptions options_; TypedBufferBuilder seen_; + TypedBufferBuilder has_nulls_; }; struct GroupedAllImpl : public GroupedAggregator { - Status Init(ExecContext* ctx, const FunctionOptions*) override { + Status Init(ExecContext* ctx, const FunctionOptions* options) override { + options_ = *checked_cast(options); seen_ = TypedBufferBuilder(ctx->memory_pool()); + has_nulls_ = TypedBufferBuilder(ctx->memory_pool()); return Status::OK(); } Status Resize(int64_t new_num_groups) override { auto added_groups = new_num_groups - num_groups_; num_groups_ = new_num_groups; - return seen_.Append(added_groups, true); + RETURN_NOT_OK(seen_.Append(added_groups, true)); + return has_nulls_.Append(added_groups, false); } Status Merge(GroupedAggregator&& raw_other, @@ -1807,17 +1836,23 @@ struct GroupedAllImpl : public GroupedAggregator { auto seen = seen_.mutable_data(); auto other_seen = other->seen_.data(); + auto has_nulls = has_nulls_.mutable_data(); + auto other_has_nulls = other->has_nulls_.data(); auto g = group_id_mapping.GetValues(1); for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) { BitUtil::SetBitTo( seen, *g, BitUtil::GetBit(seen, *g) && BitUtil::GetBit(other_seen, other_g)); + if (BitUtil::GetBit(other_has_nulls, other_g)) { + BitUtil::SetBitTo(has_nulls, *g, true); + } } return Status::OK(); } Status Consume(const ExecBatch& batch) override { auto seen = seen_.mutable_data(); + auto has_nulls = has_nulls_.mutable_data(); const auto& input = *batch[0].array(); @@ -1832,7 +1867,7 @@ struct GroupedAllImpl : public GroupedAggregator { BitUtil::GetBit(bitmap, input.offset + position)); g++; }, - [&]() { g++; }); + [&]() { BitUtil::SetBitTo(has_nulls, *g++, true); }); } else { arrow::internal::VisitBitBlocksVoid( input.buffers[1], input.offset, input.length, [&](int64_t) { g++; }, @@ -1843,7 +1878,18 @@ struct GroupedAllImpl : public GroupedAggregator { Result Finalize() override { ARROW_ASSIGN_OR_RAISE(auto seen, seen_.Finish()); - return std::make_shared(num_groups_, std::move(seen)); + if (options_.skip_nulls) { + return std::make_shared(num_groups_, std::move(seen)); + } + ARROW_ASSIGN_OR_RAISE(auto bitmap, has_nulls_.Finish()); + // null if (seen & has_nulls) + ::arrow::internal::BitmapAnd(seen->data(), /*left_offset=*/0, bitmap->data(), + /*right_offset=*/0, num_groups_, /*out_offset=*/0, + bitmap->mutable_data()); + ::arrow::internal::InvertBitmap(bitmap->data(), /*offset=*/0, num_groups_, + bitmap->mutable_data(), /*dest_offset=*/0); + return std::make_shared(num_groups_, std::move(seen), + std::move(bitmap)); } std::shared_ptr out_type() const override { return boolean(); } @@ -1851,6 +1897,7 @@ struct GroupedAllImpl : public GroupedAggregator { int64_t num_groups_ = 0; ScalarAggregateOptions options_; TypedBufferBuilder seen_; + TypedBufferBuilder has_nulls_; }; } // namespace @@ -2122,7 +2169,8 @@ const FunctionDoc hash_count_doc{"Count the number of null / non-null values", const FunctionDoc hash_sum_doc{"Sum values of a numeric array", ("Null values are ignored."), - {"array", "group_id_array"}}; + {"array", "group_id_array"}, + "ScalarAggregateOptions"}; const FunctionDoc hash_product_doc{ "Compute product of values of a numeric array", @@ -2132,7 +2180,8 @@ const FunctionDoc hash_product_doc{ const FunctionDoc hash_mean_doc{"Average values of a numeric array", ("Null values are ignored."), - {"array", "group_id_array"}}; + {"array", "group_id_array"}, + "ScalarAggregateOptions"}; const FunctionDoc hash_stddev_doc{ "Calculate the standard deviation of a numeric array", @@ -2155,7 +2204,8 @@ const FunctionDoc hash_tdigest_doc{ ("By default, the 0.5 quantile (median) is returned.\n" "Nulls and NaNs are ignored.\n" "A null array is returned if there are no valid data points."), - {"array", "group_id_array"}}; + {"array", "group_id_array"}, + "TDigestOptions"}; const FunctionDoc hash_min_max_doc{ "Compute the minimum and maximum values of a numeric array", @@ -2175,6 +2225,9 @@ const FunctionDoc hash_all_doc{"Test whether all elements evaluate to true", void RegisterHashAggregateBasic(FunctionRegistry* registry) { static auto default_scalar_aggregate_options = ScalarAggregateOptions::Defaults(); + static auto default_tdigest_options = TDigestOptions::Defaults(); + static auto default_variance_options = VarianceOptions::Defaults(); + { static auto default_count_options = CountOptions::Defaults(); auto func = std::make_shared( @@ -2222,7 +2275,6 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) { DCHECK_OK(registry->AddFunction(std::move(func))); } - static auto default_variance_options = VarianceOptions::Defaults(); { auto func = std::make_shared( "hash_stddev", Arity::Binary(), &hash_stddev_doc, &default_variance_options); @@ -2247,7 +2299,6 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) { DCHECK_OK(registry->AddFunction(std::move(func))); } - static auto default_tdigest_options = TDigestOptions::Defaults(); { auto func = std::make_shared( "hash_tdigest", Arity::Binary(), &hash_tdigest_doc, &default_tdigest_options); @@ -2264,7 +2315,6 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) { auto func = std::make_shared( "hash_min_max", Arity::Binary(), &hash_min_max_doc, &default_scalar_aggregate_options); - DCHECK_OK(AddHashAggKernels({boolean()}, GroupedSumFactory::Make, func.get())); DCHECK_OK(AddHashAggKernels(NumericTypes(), GroupedMinMaxFactory::Make, func.get())); // Type parameters are ignored DCHECK_OK(AddHashAggKernels({decimal128(1, 1), decimal256(1, 1)}, @@ -2273,15 +2323,15 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) { } { - auto func = std::make_shared("hash_any", Arity::Binary(), - &hash_any_doc); + auto func = std::make_shared( + "hash_any", Arity::Binary(), &hash_any_doc, &default_scalar_aggregate_options); DCHECK_OK(func->AddKernel(MakeKernel(boolean(), HashAggregateInit))); DCHECK_OK(registry->AddFunction(std::move(func))); } { - auto func = std::make_shared("hash_all", Arity::Binary(), - &hash_all_doc); + auto func = std::make_shared( + "hash_all", Arity::Binary(), &hash_all_doc, &default_scalar_aggregate_options); DCHECK_OK(func->AddKernel(MakeKernel(boolean(), HashAggregateInit))); DCHECK_OK(registry->AddFunction(std::move(func))); } diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc index c69b51e71fc..e96fdcd6084 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -1083,6 +1083,7 @@ TEST(GroupBy, MinMaxDecimal) { } TEST(GroupBy, AnyAndAll) { + ScalarAggregateOptions options(/*skip_nulls=*/false); for (bool use_threads : {true, false}) { SCOPED_TRACE(use_threads ? "parallel/merged" : "serial"); @@ -1094,37 +1095,57 @@ TEST(GroupBy, AnyAndAll) { R"([ [false, 2], [null, 3], + [null, 4], + [false, 4], + [true, 5], [false, null], [true, 1], [true, 2] ])", R"([ - [true, 2], + [false, 2], [false, null], [null, 3] ])"}); ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, - internal::GroupBy({table->GetColumnByName("argument"), - table->GetColumnByName("argument")}, - {table->GetColumnByName("key")}, - { - {"hash_any", nullptr}, - {"hash_all", nullptr}, - }, - use_threads)); + internal::GroupBy( + { + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + }, + {table->GetColumnByName("key")}, + { + {"hash_any", nullptr}, + {"hash_all", nullptr}, + {"hash_any", &options}, + {"hash_all", &options}, + }, + use_threads)); SortBy({"key_0"}, &aggregated_and_grouped); + // Group 1: trues and nulls + // Group 2: trues and falses + // Group 3: nulls + // Group 4: falses and nulls + // Group 5: trues + // Group null: falses AssertDatumsEqual(ArrayFromJSON(struct_({ + field("hash_any", boolean()), + field("hash_all", boolean()), field("hash_any", boolean()), field("hash_all", boolean()), field("key_0", int64()), }), R"([ - [true, true, 1], - [true, false, 2], - [false, true, 3], - [false, false, null] + [true, true, true, null, 1], + [true, false, true, false, 2], + [false, true, null, null, 3], + [false, false, null, false, 4], + [true, true, true, true, 5], + [false, false, false, false, null] ])"), aggregated_and_grouped, /*verbose=*/true); diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst index 39bbbec3e16..25cb7b7822f 100644 --- a/docs/source/cpp/compute.rst +++ b/docs/source/cpp/compute.rst @@ -183,6 +183,9 @@ recommend you try it out. Unsupported input types return a ``TypeError`` Aggregations ------------ +Scalar aggregations operate on a (chunked) array or scalar value and reduce +the input to a single output value. + +---------------+-------+-------------+----------------+----------------------------------+-------+ | Function name | Arity | Input types | Output type | Options class | Notes | +===============+=======+=============+================+==================================+=======+ @@ -208,15 +211,16 @@ Aggregations +---------------+-------+-------------+----------------+----------------------------------+-------+ | sum | Unary | Numeric | Scalar Numeric | :struct:`ScalarAggregateOptions` | \(5) | +---------------+-------+-------------+----------------+----------------------------------+-------+ -| tdigest | Unary | Numeric | Scalar Float64 | :struct:`TDigestOptions` | | +| tdigest | Unary | Numeric | Scalar Float64 | :struct:`TDigestOptions` | \(7) | +---------------+-------+-------------+----------------+----------------------------------+-------+ | variance | Unary | Numeric | Scalar Float64 | :struct:`VarianceOptions` | | +---------------+-------+-------------+----------------+----------------------------------+-------+ Notes: -* \(1) If null values are taken into account by setting ScalarAggregateOptions - parameter skip_nulls = false then `Kleene logic`_ logic is applied. +* \(1) If null values are taken into account, by setting the + ScalarAggregateOptions parameter skip_nulls = false, then `Kleene logic`_ + logic is applied. The min_count option is not respected. * \(2) CountMode controls whether only non-null values are counted (the default), only null values are counted, or all values are counted. @@ -234,6 +238,93 @@ Notes: * \(6) Output is Float64 or input type, depending on QuantileOptions. +* \(7) tdigest/t-digest computes approximate quantiles, and so only needs a + fixed amount of memory. See the `reference implementation + `_ for details. + +Grouped Aggregations ("group by") +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Grouped aggregations are not directly invokable, but are used as part of a +SQL-style "group by" operation. Like scalar aggregations, grouped aggregations +reduce multiple input values to a single output value. Instead of aggregating +all values of the input, however, grouped aggregations partition the input +values on some set of "key" columns, then aggregate each group individually, +emitting one output value per input group. + +As an example, for the following table: + ++------------------+-----------------+ +| Column ``key`` | Column ``x`` | ++==================+=================+ +| "a" | 2 | ++------------------+-----------------+ +| "a" | 5 | ++------------------+-----------------+ +| "b" | null | ++------------------+-----------------+ +| "b" | null | ++------------------+-----------------+ +| null | null | ++------------------+-----------------+ +| null | 9 | ++------------------+-----------------+ + +we can compute a sum of the column ``x``, grouped on the column ``key``. +This gives us three groups, with the following results. Note that null is +treated as a distinct key value. + ++------------------+-----------------------+ +| Column ``key`` | Column ``sum(x)`` | ++==================+=======================+ +| "a" | 7 | ++------------------+-----------------------+ +| "b" | null | ++------------------+-----------------------+ +| null | 9 | ++------------------+-----------------------+ + +The supported aggregation functions are as follows. All function names are +prefixed with ``hash_``, which differentiates them from their scalar +equivalents above and reflects how they are implemented internally. + ++---------------+-------+-------------+----------------+----------------------------------+-------+ +| Function name | Arity | Input types | Output type | Options class | Notes | ++===============+=======+=============+================+==================================+=======+ +| hash_all | Unary | Boolean | Boolean | :struct:`ScalarAggregateOptions` | \(1) | ++---------------+-------+-------------+----------------+----------------------------------+-------+ +| hash_any | Unary | Boolean | Boolean | :struct:`ScalarAggregateOptions` | \(1) | ++---------------+-------+-------------+----------------+----------------------------------+-------+ +| hash_count | Unary | Any | Int64 | :struct:`CountOptions` | \(2) | ++---------------+-------+-------------+----------------+----------------------------------+-------+ +| hash_mean | Unary | Numeric | Float64 | :struct:`ScalarAggregateOptions` | | ++---------------+-------+-------------+----------------+----------------------------------+-------+ +| hash_min_max | Unary | Numeric | Struct | :struct:`ScalarAggregateOptions` | \(3) | ++---------------+-------+-------------+----------------+----------------------------------+-------+ +| hash_stddev | Unary | Numeric | Float64 | :struct:`VarianceOptions` | | ++---------------+-------+-------------+----------------+----------------------------------+-------+ +| hash_sum | Unary | Numeric | Numeric | :struct:`ScalarAggregateOptions` | \(4) | ++---------------+-------+-------------+----------------+----------------------------------+-------+ +| hash_tdigest | Unary | Numeric | Float64 | :struct:`TDigestOptions` | \(5) | ++---------------+-------+-------------+----------------+----------------------------------+-------+ +| hash_variance | Unary | Numeric | Float64 | :struct:`VarianceOptions` | | ++---------------+-------+-------------+----------------+----------------------------------+-------+ + +* \(1) If null values are taken into account, by setting the + :member:`ScalarAggregateOptions::skip_nulls` to false, then `Kleene logic`_ + logic is applied. The min_count option is not respected. + +* \(2) CountMode controls whether only non-null values are counted (the + default), only null values are counted, or all values are counted. + +* \(3) Output is a ``{"min": input type, "max": input type}`` Struct scalar. + +* \(4) Output is Int64, UInt64 or Float64, depending on the input type. + +* \(5) T-digest computes approximate quantiles, and so only needs a + fixed amount of memory. See the `reference implementation + `_ for details. + Element-wise ("scalar") functions ---------------------------------