From 670281ae5d4f6849a5b45a5a08f91b3e416a4bcb Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 26 Aug 2021 11:51:54 -0400 Subject: [PATCH 1/7] ARROW-13764: [C++] Update docs --- docs/source/cpp/compute.rst | 56 ++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst index 2b39e3ca33a..7dbb8cea730 100644 --- a/docs/source/cpp/compute.rst +++ b/docs/source/cpp/compute.rst @@ -288,36 +288,42 @@ The supported aggregation functions are as follows. All function names are prefixed with ``hash_``, which differentiates them from their scalar equivalents above and reflects how they are implemented internally. -+---------------+-------+-------------+-----------------+----------------------------------+-------+ -| Function name | Arity | Input types | Output type | Options class | Notes | -+===============+=======+=============+=================+==================================+=======+ -| hash_all | Unary | Boolean | Boolean | :struct:`ScalarAggregateOptions` | \(1) | -+---------------+-------+-------------+-----------------+----------------------------------+-------+ -| hash_any | Unary | Boolean | Boolean | :struct:`ScalarAggregateOptions` | \(1) | -+---------------+-------+-------------+-----------------+----------------------------------+-------+ -| hash_count | Unary | Any | Int64 | :struct:`CountOptions` | \(2) | -+---------------+-------+-------------+-----------------+----------------------------------+-------+ -| hash_mean | Unary | Numeric | Decimal/Float64 | :struct:`ScalarAggregateOptions` | | -+---------------+-------+-------------+-----------------+----------------------------------+-------+ -| hash_min_max | Unary | Numeric | Struct | :struct:`ScalarAggregateOptions` | \(3) | -+---------------+-------+-------------+-----------------+----------------------------------+-------+ -| hash_product | Unary | Numeric | Numeric | :struct:`ScalarAggregateOptions` | \(4) | -+---------------+-------+-------------+-----------------+----------------------------------+-------+ -| hash_stddev | Unary | Numeric | Float64 | :struct:`VarianceOptions` | | -+---------------+-------+-------------+-----------------+----------------------------------+-------+ -| hash_sum | Unary | Numeric | Numeric | :struct:`ScalarAggregateOptions` | \(4) | -+---------------+-------+-------------+-----------------+----------------------------------+-------+ -| hash_tdigest | Unary | Numeric | Float64 | :struct:`TDigestOptions` | \(5) | -+---------------+-------+-------------+-----------------+----------------------------------+-------+ -| hash_variance | Unary | Numeric | Float64 | :struct:`VarianceOptions` | | -+---------------+-------+-------------+-----------------+----------------------------------+-------+ ++---------------------+-------+-------------+-----------------+----------------------------------+-------+ +| Function name | Arity | Input types | Output type | Options class | Notes | ++=====================+=======+=============+=================+==================================+=======+ +| hash_all | Unary | Boolean | Boolean | :struct:`ScalarAggregateOptions` | \(1) | ++---------------------+-------+-------------+-----------------+----------------------------------+-------+ +| hash_any | Unary | Boolean | Boolean | :struct:`ScalarAggregateOptions` | \(1) | ++---------------------+-------+-------------+-----------------+----------------------------------+-------+ +| hash_count | Unary | Any | Int64 | :struct:`CountOptions` | \(2) | ++---------------------+-------+-------------+-----------------+----------------------------------+-------+ +| hash_count_distinct | Unary | Any | Int64 | :struct:`CountOptions` | \(2) | ++---------------------+-------+-------------+-----------------+----------------------------------+-------+ +| hash_distinct | Unary | Any | Input type | :struct:`CountOptions` | \(2) | ++---------------------+-------+-------------+-----------------+----------------------------------+-------+ +| hash_mean | Unary | Numeric | Decimal/Float64 | :struct:`ScalarAggregateOptions` | | ++---------------------+-------+-------------+-----------------+----------------------------------+-------+ +| hash_min_max | Unary | Numeric | Struct | :struct:`ScalarAggregateOptions` | \(3) | ++---------------------+-------+-------------+-----------------+----------------------------------+-------+ +| hash_product | Unary | Numeric | Numeric | :struct:`ScalarAggregateOptions` | \(4) | ++---------------------+-------+-------------+-----------------+----------------------------------+-------+ +| hash_stddev | Unary | Numeric | Float64 | :struct:`VarianceOptions` | | ++---------------------+-------+-------------+-----------------+----------------------------------+-------+ +| hash_sum | Unary | Numeric | Numeric | :struct:`ScalarAggregateOptions` | \(4) | ++---------------------+-------+-------------+-----------------+----------------------------------+-------+ +| hash_tdigest | Unary | Numeric | Float64 | :struct:`TDigestOptions` | \(5) | ++---------------------+-------+-------------+-----------------+----------------------------------+-------+ +| hash_variance | Unary | Numeric | Float64 | :struct:`VarianceOptions` | | ++---------------------+-------+-------------+-----------------+----------------------------------+-------+ * \(1) If null values are taken into account, by setting the :member:`ScalarAggregateOptions::skip_nulls` to false, then `Kleene logic`_ logic is applied. The min_count option is not respected. -* \(2) CountMode controls whether only non-null values are counted (the - default), only null values are counted, or all values are counted. +* \(2) CountMode controls whether only non-null values are counted + (the default), only null values are counted, or all values are + counted. For hash_distinct, it instead controls whether null values + are emitted. * \(3) Output is a ``{"min": input type, "max": input type}`` Struct scalar. From d5f703b06a1c3a03bf7d99802cde567db8613c30 Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 26 Aug 2021 11:52:20 -0400 Subject: [PATCH 2/7] ARROW-13764: [C++] Update function signature --- .../arrow/compute/kernels/hash_aggregate.cc | 23 ++++++++---- .../compute/kernels/hash_aggregate_test.cc | 37 ++++++++++++++----- 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 0a567e385e7..892c9a61ed9 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -1970,6 +1970,7 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { Status Init(ExecContext* ctx, const FunctionOptions* options) override { ctx_ = ctx; pool_ = ctx->memory_pool(); + options_ = checked_cast(*options); return Status::OK(); } @@ -2026,6 +2027,7 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { ExecContext* ctx_; MemoryPool* pool_; int64_t num_groups_; + CountOptions options_; std::unique_ptr grouper_; std::shared_ptr out_type_; }; @@ -2383,22 +2385,26 @@ const FunctionDoc hash_all_doc{"Test whether all elements evaluate to true", const FunctionDoc hash_count_distinct_doc{ "Count the distinct values in each group", - ("Nulls are counted. NaNs and signed zeroes are not normalized."), - {"array", "group_id_array"}}; + ("Whether nulls/values are counted is controlled by CountOptions.\n" + "NaNs and signed zeroes are not normalized."), + {"array", "group_id_array"}, + "CountOptions"}; const FunctionDoc hash_distinct_doc{ "Keep the distinct values in each group", - ("Nulls are kept. NaNs and signed zeroes are not normalized."), - {"array", "group_id_array"}}; + ("Whether nulls/values are kept is controlled by CountOptions.\n" + "NaNs and signed zeroes are not normalized."), + {"array", "group_id_array"}, + "CountOptions"}; } // namespace void RegisterHashAggregateBasic(FunctionRegistry* registry) { + static auto default_count_options = CountOptions::Defaults(); static auto default_scalar_aggregate_options = ScalarAggregateOptions::Defaults(); static auto default_tdigest_options = TDigestOptions::Defaults(); static auto default_variance_options = VarianceOptions::Defaults(); { - static auto default_count_options = CountOptions::Defaults(); auto func = std::make_shared( "hash_count", Arity::Binary(), &hash_count_doc, &default_count_options); @@ -2516,15 +2522,16 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) { { auto func = std::make_shared( - "hash_count_distinct", Arity::Binary(), &hash_count_distinct_doc); + "hash_count_distinct", Arity::Binary(), &hash_count_distinct_doc, + &default_count_options); DCHECK_OK(func->AddKernel( MakeKernel(ValueDescr::ARRAY, GroupedDistinctInit))); DCHECK_OK(registry->AddFunction(std::move(func))); } { - auto func = std::make_shared("hash_distinct", Arity::Binary(), - &hash_distinct_doc); + auto func = std::make_shared( + "hash_distinct", Arity::Binary(), &hash_distinct_doc, &default_count_options); DCHECK_OK(func->AddKernel( MakeKernel(ValueDescr::ARRAY, GroupedDistinctInit))); DCHECK_OK(registry->AddFunction(std::move(func))); diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc index df2222a4eef..aec5df95634 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -1240,6 +1240,9 @@ TEST(GroupBy, AnyAndAll) { } TEST(GroupBy, CountDistinct) { + CountOptions all(CountOptions::ALL); + CountOptions only_valid(CountOptions::ONLY_VALID); + CountOptions only_null(CountOptions::ONLY_NULL); for (bool use_threads : {true, false}) { SCOPED_TRACE(use_threads ? "parallel/merged" : "serial"); @@ -1250,6 +1253,7 @@ TEST(GroupBy, CountDistinct) { ])", R"([ [0, 2], + [null, 3], [null, 3] ])", R"([ @@ -1273,26 +1277,32 @@ TEST(GroupBy, CountDistinct) { internal::GroupBy( { table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), }, { table->GetColumnByName("key"), }, { - {"hash_count_distinct", nullptr}, + {"hash_count_distinct", &all}, + {"hash_count_distinct", &only_valid}, + {"hash_count_distinct", &only_null}, }, use_threads)); SortBy({"key_0"}, &aggregated_and_grouped); ValidateOutput(aggregated_and_grouped); AssertDatumsEqual(ArrayFromJSON(struct_({ + field("hash_count_distinct", int64()), + field("hash_count_distinct", int64()), field("hash_count_distinct", int64()), field("key_0", int64()), }), R"([ - [1, 1], - [2, 2], - [3, 3], - [4, null] + [1, 1, 0, 1], + [2, 2, 0, 2], + [3, 2, 1, 3], + [4, 4, 0, null] ])"), aggregated_and_grouped, /*verbose=*/true); @@ -1304,6 +1314,7 @@ TEST(GroupBy, CountDistinct) { ])", R"([ ["bar", 2], + [null, 3], [null, 3] ])", R"([ @@ -1327,26 +1338,32 @@ TEST(GroupBy, CountDistinct) { internal::GroupBy( { table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), }, { table->GetColumnByName("key"), }, { - {"hash_count_distinct", nullptr}, + {"hash_count_distinct", &all}, + {"hash_count_distinct", &only_valid}, + {"hash_count_distinct", &only_null}, }, use_threads)); ValidateOutput(aggregated_and_grouped); SortBy({"key_0"}, &aggregated_and_grouped); AssertDatumsEqual(ArrayFromJSON(struct_({ + field("hash_count_distinct", int64()), + field("hash_count_distinct", int64()), field("hash_count_distinct", int64()), field("key_0", int64()), }), R"([ - [1, 1], - [2, 2], - [3, 3], - [4, null] + [1, 1, 0, 1], + [2, 2, 0, 2], + [3, 2, 1, 3], + [4, 4, 0, null] ])"), aggregated_and_grouped, /*verbose=*/true); From 48a3128f35cf2a2b2ddf4351c60016289e05257d Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 26 Aug 2021 11:52:27 -0400 Subject: [PATCH 3/7] ARROW-13764: [C++] Implement in Finalize() --- cpp/src/arrow/compute/kernels/hash_aggregate.cc | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 892c9a61ed9..eccded450ae 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -2014,8 +2014,21 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { ARROW_ASSIGN_OR_RAISE(auto uniques, grouper_->GetUniques()); auto* g = uniques[1].array()->GetValues(1); - for (int64_t i = 0; i < uniques.length; i++) { - counts[g[i]]++; + const auto& items = *uniques[0].array(); + const auto* valid = items.GetValues(0, 0); + if (options_.mode == CountOptions::ALL || + (options_.mode == CountOptions::ONLY_VALID && !valid)) { + for (int64_t i = 0; i < uniques.length; i++) { + counts[g[i]]++; + } + } else if (options_.mode == CountOptions::ONLY_VALID) { + for (int64_t i = 0; i < uniques.length; i++) { + counts[g[i]] += BitUtil::GetBit(valid, items.offset + i); + } + } else { // ONLY_NULL + for (int64_t i = 0; i < uniques.length; i++) { + counts[g[i]] += !BitUtil::GetBit(valid, items.offset + i); + } } return ArrayData::Make(int64(), num_groups_, {nullptr, std::move(values)}, From 31cf5bd7a11200f008d07b66a0d663d14365af8a Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 26 Aug 2021 12:31:40 -0400 Subject: [PATCH 4/7] ARROW-13764: [C++] Implement by filtering before consume --- .../arrow/compute/kernels/hash_aggregate.cc | 50 ++++++++++----- .../compute/kernels/hash_aggregate_test.cc | 63 ++++++++++++++++--- 2 files changed, 90 insertions(+), 23 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index eccded450ae..49b7d87522b 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -38,6 +38,7 @@ #include "arrow/compute/kernels/aggregate_var_std_internal.h" #include "arrow/compute/kernels/common.h" #include "arrow/compute/kernels/util_internal.h" +#include "arrow/record_batch.h" #include "arrow/util/bit_run_reader.h" #include "arrow/util/bitmap_ops.h" #include "arrow/util/bitmap_writer.h" @@ -1980,7 +1981,37 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { } Status Consume(const ExecBatch& batch) override { - return grouper_->Consume(batch).status(); + const auto& values = *batch[0].array(); + if (options_.mode == CountOptions::ALL || + (options_.mode == CountOptions::ONLY_VALID && values.GetNullCount() == 0)) { + return grouper_->Consume(batch).status(); + } + + FieldVector fields; + fields.reserve(batch.num_values()); + for (const auto& value : batch.values) { + fields.push_back(field("", value.type())); + } + ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema(std::move(fields)), pool_)); + + if (options_.mode == CountOptions::ONLY_VALID) { + auto filter = std::make_shared(batch.length, values.buffers[0]); + ARROW_ASSIGN_OR_RAISE(auto filtered, + Filter(rb, filter, FilterOptions(FilterOptions::DROP), ctx_)); + return grouper_->Consume(ExecBatch(*filtered.record_batch())).status(); + } + // ONLY_NULL + if (values.GetNullCount() == 0) return Status::OK(); + // This branch is...fairly pointless, hence the naive implementation here, + // but if we do care about performance, we can write a specialized kernel + // implementation. + ARROW_ASSIGN_OR_RAISE(auto mask, + arrow::internal::InvertBitmap(pool_, values.buffers[0]->data(), + values.offset, values.length)); + auto filter = std::make_shared(batch.length, mask); + ARROW_ASSIGN_OR_RAISE(auto filtered, + Filter(rb, filter, FilterOptions(FilterOptions::DROP), ctx_)); + return grouper_->Consume(ExecBatch(*filtered.record_batch())).status(); } Status Merge(GroupedAggregator&& raw_other, @@ -2014,21 +2045,8 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { ARROW_ASSIGN_OR_RAISE(auto uniques, grouper_->GetUniques()); auto* g = uniques[1].array()->GetValues(1); - const auto& items = *uniques[0].array(); - const auto* valid = items.GetValues(0, 0); - if (options_.mode == CountOptions::ALL || - (options_.mode == CountOptions::ONLY_VALID && !valid)) { - for (int64_t i = 0; i < uniques.length; i++) { - counts[g[i]]++; - } - } else if (options_.mode == CountOptions::ONLY_VALID) { - for (int64_t i = 0; i < uniques.length; i++) { - counts[g[i]] += BitUtil::GetBit(valid, items.offset + i); - } - } else { // ONLY_NULL - for (int64_t i = 0; i < uniques.length; i++) { - counts[g[i]] += !BitUtil::GetBit(valid, items.offset + i); - } + for (int64_t i = 0; i < uniques.length; i++) { + counts[g[i]]++; } return ArrayData::Make(int64(), num_groups_, {nullptr, std::move(values)}, diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc index aec5df95634..56493a6cf07 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -1255,6 +1255,10 @@ TEST(GroupBy, CountDistinct) { [0, 2], [null, 3], [null, 3] +])", + R"([ + [null, 4], + [null, 4] ])", R"([ [4, null], @@ -1302,6 +1306,7 @@ TEST(GroupBy, CountDistinct) { [1, 1, 0, 1], [2, 2, 0, 2], [3, 2, 1, 3], + [1, 0, 1, 4], [4, 4, 0, null] ])"), aggregated_and_grouped, @@ -1316,6 +1321,10 @@ TEST(GroupBy, CountDistinct) { ["bar", 2], [null, 3], [null, 3] +])", + R"([ + [null, 4], + [null, 4] ])", R"([ ["baz", null], @@ -1363,6 +1372,7 @@ TEST(GroupBy, CountDistinct) { [1, 1, 0, 1], [2, 2, 0, 2], [3, 2, 1, 3], + [1, 0, 1, 4], [4, 4, 0, null] ])"), aggregated_and_grouped, @@ -1371,6 +1381,9 @@ TEST(GroupBy, CountDistinct) { } TEST(GroupBy, Distinct) { + CountOptions all(CountOptions::ALL); + CountOptions only_valid(CountOptions::ONLY_VALID); + CountOptions only_null(CountOptions::ONLY_NULL); for (bool use_threads : {true, false}) { SCOPED_TRACE(use_threads ? "parallel/merged" : "serial"); @@ -1381,7 +1394,12 @@ TEST(GroupBy, Distinct) { ])", R"([ ["bar", 2], + [null, 3], [null, 3] +])", + R"([ + [null, 4], + [null, 4] ])", R"([ ["baz", null], @@ -1404,12 +1422,16 @@ TEST(GroupBy, Distinct) { internal::GroupBy( { table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), }, { table->GetColumnByName("key"), }, { - {"hash_distinct", nullptr}, + {"hash_distinct", &all}, + {"hash_distinct", &only_valid}, + {"hash_distinct", &only_null}, }, use_threads)); ValidateOutput(aggregated_and_grouped); @@ -1417,20 +1439,47 @@ TEST(GroupBy, Distinct) { // Order of sub-arrays is not stable auto struct_arr = aggregated_and_grouped.array_as(); - auto distinct_arr = checked_pointer_cast(struct_arr->field(0)); auto sort = [](const Array& arr) -> std::shared_ptr { EXPECT_OK_AND_ASSIGN(auto indices, SortIndices(arr)); EXPECT_OK_AND_ASSIGN(auto sorted, Take(arr, indices)); return sorted.make_array(); }; - AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["foo"])"), - sort(*distinct_arr->value_slice(0)), /*verbose=*/true); + + auto all_arr = checked_pointer_cast(struct_arr->field(0)); + AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["foo"])"), sort(*all_arr->value_slice(0)), + /*verbose=*/true); AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["bar", "spam"])"), - sort(*distinct_arr->value_slice(1)), /*verbose=*/true); + sort(*all_arr->value_slice(1)), /*verbose=*/true); AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["foo", "ham", null])"), - sort(*distinct_arr->value_slice(2)), /*verbose=*/true); + sort(*all_arr->value_slice(2)), /*verbose=*/true); + AssertDatumsEqual(ArrayFromJSON(utf8(), R"([null])"), sort(*all_arr->value_slice(3)), + /*verbose=*/true); + AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["a", "b", "baz", "eggs"])"), + sort(*all_arr->value_slice(4)), /*verbose=*/true); + + auto valid_arr = checked_pointer_cast(struct_arr->field(1)); + AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["foo"])"), + sort(*valid_arr->value_slice(0)), /*verbose=*/true); + AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["bar", "spam"])"), + sort(*valid_arr->value_slice(1)), /*verbose=*/true); + AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["foo", "ham"])"), + sort(*valid_arr->value_slice(2)), /*verbose=*/true); + AssertDatumsEqual(ArrayFromJSON(utf8(), R"([])"), sort(*valid_arr->value_slice(3)), + /*verbose=*/true); AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["a", "b", "baz", "eggs"])"), - sort(*distinct_arr->value_slice(3)), /*verbose=*/true); + sort(*valid_arr->value_slice(4)), /*verbose=*/true); + + auto null_arr = checked_pointer_cast(struct_arr->field(2)); + AssertDatumsEqual(ArrayFromJSON(utf8(), R"([])"), sort(*null_arr->value_slice(0)), + /*verbose=*/true); + AssertDatumsEqual(ArrayFromJSON(utf8(), R"([])"), sort(*null_arr->value_slice(1)), + /*verbose=*/true); + AssertDatumsEqual(ArrayFromJSON(utf8(), R"([null])"), sort(*null_arr->value_slice(2)), + /*verbose=*/true); + AssertDatumsEqual(ArrayFromJSON(utf8(), R"([null])"), sort(*null_arr->value_slice(3)), + /*verbose=*/true); + AssertDatumsEqual(ArrayFromJSON(utf8(), R"([])"), sort(*null_arr->value_slice(4)), + /*verbose=*/true); } } From 973594d47c377ac19cda688f975bf99669d35849 Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 26 Aug 2021 14:07:36 -0400 Subject: [PATCH 5/7] ARROW-13764: [C++] Implement by filtering in finalize --- .../arrow/compute/kernels/hash_aggregate.cc | 94 ++++++++++++------- 1 file changed, 60 insertions(+), 34 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 49b7d87522b..bcaac8c6bf3 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -1981,37 +1981,7 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { } Status Consume(const ExecBatch& batch) override { - const auto& values = *batch[0].array(); - if (options_.mode == CountOptions::ALL || - (options_.mode == CountOptions::ONLY_VALID && values.GetNullCount() == 0)) { - return grouper_->Consume(batch).status(); - } - - FieldVector fields; - fields.reserve(batch.num_values()); - for (const auto& value : batch.values) { - fields.push_back(field("", value.type())); - } - ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema(std::move(fields)), pool_)); - - if (options_.mode == CountOptions::ONLY_VALID) { - auto filter = std::make_shared(batch.length, values.buffers[0]); - ARROW_ASSIGN_OR_RAISE(auto filtered, - Filter(rb, filter, FilterOptions(FilterOptions::DROP), ctx_)); - return grouper_->Consume(ExecBatch(*filtered.record_batch())).status(); - } - // ONLY_NULL - if (values.GetNullCount() == 0) return Status::OK(); - // This branch is...fairly pointless, hence the naive implementation here, - // but if we do care about performance, we can write a specialized kernel - // implementation. - ARROW_ASSIGN_OR_RAISE(auto mask, - arrow::internal::InvertBitmap(pool_, values.buffers[0]->data(), - values.offset, values.length)); - auto filter = std::make_shared(batch.length, mask); - ARROW_ASSIGN_OR_RAISE(auto filtered, - Filter(rb, filter, FilterOptions(FilterOptions::DROP), ctx_)); - return grouper_->Consume(ExecBatch(*filtered.record_batch())).status(); + return grouper_->Consume(batch).status(); } Status Merge(GroupedAggregator&& raw_other, @@ -2045,8 +2015,21 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { ARROW_ASSIGN_OR_RAISE(auto uniques, grouper_->GetUniques()); auto* g = uniques[1].array()->GetValues(1); - for (int64_t i = 0; i < uniques.length; i++) { - counts[g[i]]++; + const auto& items = *uniques[0].array(); + const auto* valid = items.GetValues(0, 0); + if (options_.mode == CountOptions::ALL || + (options_.mode == CountOptions::ONLY_VALID && !valid)) { + for (int64_t i = 0; i < uniques.length; i++) { + counts[g[i]]++; + } + } else if (options_.mode == CountOptions::ONLY_VALID) { + for (int64_t i = 0; i < uniques.length; i++) { + counts[g[i]] += BitUtil::GetBit(valid, items.offset + i); + } + } else { // ONLY_NULL + for (int64_t i = 0; i < uniques.length; i++) { + counts[g[i]] += !BitUtil::GetBit(valid, items.offset + i); + } } return ArrayData::Make(int64(), num_groups_, {nullptr, std::move(values)}, @@ -2069,7 +2052,50 @@ struct GroupedDistinctImpl : public GroupedCountDistinctImpl { ARROW_ASSIGN_OR_RAISE(auto groupings, grouper_->MakeGroupings( *uniques[1].array_as(), static_cast(num_groups_), ctx_)); - return grouper_->ApplyGroupings(*groupings, *uniques[0].make_array(), ctx_); + ARROW_ASSIGN_OR_RAISE( + auto list, grouper_->ApplyGroupings(*groupings, *uniques[0].make_array(), ctx_)); + auto values = list->values(); + int32_t* offsets = reinterpret_cast(list->value_offsets()->mutable_data()); + if (options_.mode == CountOptions::ALL || + (options_.mode == CountOptions::ONLY_VALID && values->null_count() == 0)) { + return list; + } else if (options_.mode == CountOptions::ONLY_VALID) { + int32_t prev_offset = offsets[0]; + for (int64_t i = 0; i < list->length(); i++) { + const int64_t slot_length = offsets[i + 1] - prev_offset; + const int64_t null_count = + slot_length - arrow::internal::CountSetBits(values->null_bitmap()->data(), + prev_offset, slot_length); + const int64_t offset = null_count > 0 ? slot_length - 1 : slot_length; + prev_offset = offsets[i + 1]; + offsets[i + 1] = offsets[i] + offset; + } + auto filter = + std::make_shared(values->length(), values->null_bitmap()); + ARROW_ASSIGN_OR_RAISE( + auto new_values, + Filter(std::move(values), filter, FilterOptions(FilterOptions::DROP), ctx_)); + return std::make_shared(list->type(), list->length(), + list->value_offsets(), new_values.make_array()); + } + // ONLY_NULL + int32_t prev_offset = offsets[0]; + for (int64_t i = 0; i < list->length(); i++) { + const int64_t slot_length = offsets[i + 1] - prev_offset; + const int64_t null_count = + slot_length - arrow::internal::CountSetBits(values->null_bitmap()->data(), + prev_offset, slot_length); + const int64_t offset = null_count > 0 ? 1 : 0; + prev_offset = offsets[i + 1]; + offsets[i + 1] = offsets[i] + offset; + } + ARROW_ASSIGN_OR_RAISE( + auto new_values, + MakeArrayOfNull(out_type_, + list->length() > 0 ? offsets[list->length()] - offsets[0] : 0, + pool_)); + return std::make_shared(list->type(), list->length(), + list->value_offsets(), std::move(new_values)); } std::shared_ptr out_type() const override { return list(out_type_); } From e13714e0ba959deb6e876bece300717bf7e6de18 Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 26 Aug 2021 15:03:55 -0400 Subject: [PATCH 6/7] ARROW-13764: [C++] Fix data types --- cpp/src/arrow/compute/kernels/hash_aggregate.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index bcaac8c6bf3..5e60535aa3f 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -2062,11 +2062,11 @@ struct GroupedDistinctImpl : public GroupedCountDistinctImpl { } else if (options_.mode == CountOptions::ONLY_VALID) { int32_t prev_offset = offsets[0]; for (int64_t i = 0; i < list->length(); i++) { - const int64_t slot_length = offsets[i + 1] - prev_offset; + const int32_t slot_length = offsets[i + 1] - prev_offset; const int64_t null_count = slot_length - arrow::internal::CountSetBits(values->null_bitmap()->data(), prev_offset, slot_length); - const int64_t offset = null_count > 0 ? slot_length - 1 : slot_length; + const int32_t offset = null_count > 0 ? slot_length - 1 : slot_length; prev_offset = offsets[i + 1]; offsets[i + 1] = offsets[i] + offset; } @@ -2081,11 +2081,11 @@ struct GroupedDistinctImpl : public GroupedCountDistinctImpl { // ONLY_NULL int32_t prev_offset = offsets[0]; for (int64_t i = 0; i < list->length(); i++) { - const int64_t slot_length = offsets[i + 1] - prev_offset; + const int32_t slot_length = offsets[i + 1] - prev_offset; const int64_t null_count = slot_length - arrow::internal::CountSetBits(values->null_bitmap()->data(), prev_offset, slot_length); - const int64_t offset = null_count > 0 ? 1 : 0; + const int32_t offset = null_count > 0 ? 1 : 0; prev_offset = offsets[i + 1]; offsets[i + 1] = offsets[i] + offset; } From 5ce4fb2285e581dc2e3744e479e342f14b1610ac Mon Sep 17 00:00:00 2001 From: David Li Date: Mon, 30 Aug 2021 10:20:05 -0400 Subject: [PATCH 7/7] ARROW-13764: [C++] Address review feedback --- .../arrow/compute/kernels/hash_aggregate.cc | 26 +++--- .../compute/kernels/hash_aggregate_test.cc | 85 ++++++++++++++++++- docs/source/cpp/compute.rst | 3 +- 3 files changed, 102 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 5e60535aa3f..c099ec660b8 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -2026,7 +2026,7 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { for (int64_t i = 0; i < uniques.length; i++) { counts[g[i]] += BitUtil::GetBit(valid, items.offset + i); } - } else { // ONLY_NULL + } else if (valid) { // ONLY_NULL for (int64_t i = 0; i < uniques.length; i++) { counts[g[i]] += !BitUtil::GetBit(valid, items.offset + i); } @@ -2055,6 +2055,7 @@ struct GroupedDistinctImpl : public GroupedCountDistinctImpl { ARROW_ASSIGN_OR_RAISE( auto list, grouper_->ApplyGroupings(*groupings, *uniques[0].make_array(), ctx_)); auto values = list->values(); + DCHECK_EQ(values->offset(), 0); int32_t* offsets = reinterpret_cast(list->value_offsets()->mutable_data()); if (options_.mode == CountOptions::ALL || (options_.mode == CountOptions::ONLY_VALID && values->null_count() == 0)) { @@ -2066,6 +2067,7 @@ struct GroupedDistinctImpl : public GroupedCountDistinctImpl { const int64_t null_count = slot_length - arrow::internal::CountSetBits(values->null_bitmap()->data(), prev_offset, slot_length); + DCHECK_LE(null_count, 1); const int32_t offset = null_count > 0 ? slot_length - 1 : slot_length; prev_offset = offsets[i + 1]; offsets[i + 1] = offsets[i] + offset; @@ -2079,15 +2081,19 @@ struct GroupedDistinctImpl : public GroupedCountDistinctImpl { list->value_offsets(), new_values.make_array()); } // ONLY_NULL - int32_t prev_offset = offsets[0]; - for (int64_t i = 0; i < list->length(); i++) { - const int32_t slot_length = offsets[i + 1] - prev_offset; - const int64_t null_count = - slot_length - arrow::internal::CountSetBits(values->null_bitmap()->data(), - prev_offset, slot_length); - const int32_t offset = null_count > 0 ? 1 : 0; - prev_offset = offsets[i + 1]; - offsets[i + 1] = offsets[i] + offset; + if (values->null_count() == 0) { + std::fill(offsets + 1, offsets + list->length() + 1, offsets[0]); + } else { + int32_t prev_offset = offsets[0]; + for (int64_t i = 0; i < list->length(); i++) { + const int32_t slot_length = offsets[i + 1] - prev_offset; + const int64_t null_count = + slot_length - arrow::internal::CountSetBits(values->null_bitmap()->data(), + prev_offset, slot_length); + const int32_t offset = null_count > 0 ? 1 : 0; + prev_offset = offsets[i + 1]; + offsets[i + 1] = offsets[i] + offset; + } } ARROW_ASSIGN_OR_RAISE( auto new_values, diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc index 56493a6cf07..4d6064fa62d 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -1374,6 +1374,49 @@ TEST(GroupBy, CountDistinct) { [3, 2, 1, 3], [1, 0, 1, 4], [4, 4, 0, null] + ])"), + aggregated_and_grouped, + /*verbose=*/true); + + table = + TableFromJSON(schema({field("argument", utf8()), field("key", int64())}), { + R"([ + ["foo", 1], + ["foo", 1], + ["bar", 2], + ["bar", 2], + ["spam", 2] +])", + }); + + ASSERT_OK_AND_ASSIGN(aggregated_and_grouped, + internal::GroupBy( + { + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + }, + { + table->GetColumnByName("key"), + }, + { + {"hash_count_distinct", &all}, + {"hash_count_distinct", &only_valid}, + {"hash_count_distinct", &only_null}, + }, + use_threads)); + ValidateOutput(aggregated_and_grouped); + SortBy({"key_0"}, &aggregated_and_grouped); + + AssertDatumsEqual(ArrayFromJSON(struct_({ + field("hash_count_distinct", int64()), + field("hash_count_distinct", int64()), + field("hash_count_distinct", int64()), + field("key_0", int64()), + }), + R"([ + [1, 1, 0, 1], + [2, 2, 0, 2] ])"), aggregated_and_grouped, /*verbose=*/true); @@ -1438,13 +1481,14 @@ TEST(GroupBy, Distinct) { SortBy({"key_0"}, &aggregated_and_grouped); // Order of sub-arrays is not stable - auto struct_arr = aggregated_and_grouped.array_as(); auto sort = [](const Array& arr) -> std::shared_ptr { EXPECT_OK_AND_ASSIGN(auto indices, SortIndices(arr)); EXPECT_OK_AND_ASSIGN(auto sorted, Take(arr, indices)); return sorted.make_array(); }; + auto struct_arr = aggregated_and_grouped.array_as(); + auto all_arr = checked_pointer_cast(struct_arr->field(0)); AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["foo"])"), sort(*all_arr->value_slice(0)), /*verbose=*/true); @@ -1480,6 +1524,45 @@ TEST(GroupBy, Distinct) { /*verbose=*/true); AssertDatumsEqual(ArrayFromJSON(utf8(), R"([])"), sort(*null_arr->value_slice(4)), /*verbose=*/true); + + table = + TableFromJSON(schema({field("argument", utf8()), field("key", int64())}), { + R"([ + ["foo", 1], + ["foo", 1], + ["bar", 2], + ["bar", 2] +])", + }); + ASSERT_OK_AND_ASSIGN(aggregated_and_grouped, + internal::GroupBy( + { + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + }, + { + table->GetColumnByName("key"), + }, + { + {"hash_distinct", &all}, + {"hash_distinct", &only_valid}, + {"hash_distinct", &only_null}, + }, + use_threads)); + ValidateOutput(aggregated_and_grouped); + SortBy({"key_0"}, &aggregated_and_grouped); + + AssertDatumsEqual( + ArrayFromJSON(struct_({ + field("hash_distinct", list(utf8())), + field("hash_distinct", list(utf8())), + field("hash_distinct", list(utf8())), + field("key_0", int64()), + }), + R"([[["foo"], ["foo"], [], 1], [["bar"], ["bar"], [], 2]])"), + aggregated_and_grouped, + /*verbose=*/true); } } diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst index 7dbb8cea730..465500e8dae 100644 --- a/docs/source/cpp/compute.rst +++ b/docs/source/cpp/compute.rst @@ -323,7 +323,8 @@ equivalents above and reflects how they are implemented internally. * \(2) CountMode controls whether only non-null values are counted (the default), only null values are counted, or all values are counted. For hash_distinct, it instead controls whether null values - are emitted. + are emitted. This never affects the grouping keys, only group values + (i.e. you may get a group where the key is null). * \(3) Output is a ``{"min": input type, "max": input type}`` Struct scalar.