From 9053eeb06ac09943d87c5ba2714291bddd4198be Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 4 Aug 2021 12:50:36 -0400 Subject: [PATCH 01/11] ARROW-12728: [C++] Add count_distinct hash aggregate kernel --- .../arrow/compute/kernels/hash_aggregate.cc | 77 +++++++++++ .../compute/kernels/hash_aggregate_test.cc | 120 ++++++++++++++++++ 2 files changed, 197 insertions(+) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 20dcd8ef331..d1d7bef4c66 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -1962,6 +1962,70 @@ struct GroupedAllImpl : public GroupedBooleanAggregator { num_groups, /*out_offset=*/0, no_nulls); } }; + +// ---------------------------------------------------------------------- +// CountDistinct/Distinct implementation + +struct GroupedCountDistinctImpl : public GroupedAggregator { + Status Init(ExecContext* ctx, const FunctionOptions* options) override { + ctx_ = ctx; + pool_ = ctx->memory_pool(); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + num_groups_ = new_num_groups; + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + if (!grouper_) { + ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(batch.GetDescriptors(), ctx_)); + } + return grouper_->Consume(batch).status(); + } + + Status Merge(GroupedAggregator&& raw_other, + const ArrayData& group_id_mapping) override { + auto other = checked_cast(&raw_other); + + // Get (group_id, value) pairs, then translate the group IDs and consume them + // ourselves + ARROW_ASSIGN_OR_RAISE(auto uniques, other->grouper_->GetUniques()); + + const auto* g_mapping = group_id_mapping.GetValues(1); + auto* other_g = uniques[1].array()->GetMutableValues(1); + for (int64_t i = 0; i < uniques.length; i++) { + other_g[i] = g_mapping[other_g[i]]; + } + + return Consume(std::move(uniques)); + } + + Result Finalize() override { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr values, + AllocateBuffer(num_groups_ * sizeof(int64_t), pool_)); + int64_t* counts = reinterpret_cast(values->mutable_data()); + std::fill(counts, counts + num_groups_, 0); + + ARROW_ASSIGN_OR_RAISE(auto uniques, grouper_->GetUniques()); + auto* g = uniques[1].array()->GetValues(1); + for (int64_t i = 0; i < uniques.length; i++) { + counts[g[i]]++; + } + + return ArrayData::Make(int64(), num_groups_, {nullptr, std::move(values)}, + /*null_count=*/0); + } + + std::shared_ptr out_type() const override { return int64(); } + + ExecContext* ctx_; + MemoryPool* pool_; + uint32_t num_groups_; + std::unique_ptr grouper_; +}; + } // namespace Result> GetKernels( @@ -2289,6 +2353,11 @@ const FunctionDoc hash_all_doc{"Test whether all elements evaluate to true", ("Null values are ignored."), {"array", "group_id_array"}, "ScalarAggregateOptions"}; + +const FunctionDoc hash_count_distinct_doc{ + "Count the distinct values in each group", + ("Nulls are counted. NaNs and signed zeroes are not normalized."), + {"array", "group_id_array"}}; } // namespace void RegisterHashAggregateBasic(FunctionRegistry* registry) { @@ -2412,6 +2481,14 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) { DCHECK_OK(func->AddKernel(MakeKernel(boolean(), HashAggregateInit))); DCHECK_OK(registry->AddFunction(std::move(func))); } + + { + auto func = std::make_shared( + "hash_count_distinct", Arity::Binary(), &hash_count_distinct_doc); + DCHECK_OK(func->AddKernel( + MakeKernel(ValueDescr::ARRAY, HashAggregateInit))); + DCHECK_OK(registry->AddFunction(std::move(func))); + } } } // namespace internal diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc index a160461b5dc..8b89fa47491 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -1239,6 +1239,126 @@ TEST(GroupBy, AnyAndAll) { } } +TEST(GroupBy, CountDistinct) { + for (bool use_threads : {true, false}) { + SCOPED_TRACE(use_threads ? "parallel/merged" : "serial"); + + auto table = + TableFromJSON(schema({field("argument", float64()), field("key", int64())}), {R"([ + [1, 1], + [1, 1] +])", + R"([ + [0, 2], + [null, 3] +])", + R"([ + [4, null], + [1, 3] +])", + R"([ + [0, 2], + [-1, 2] +])", + R"([ + [1, null], + [NaN, 3] + ])", + R"([ + [2, null], + [3, null] + ])"}); + + ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, + internal::GroupBy( + { + table->GetColumnByName("argument"), + }, + { + table->GetColumnByName("key"), + }, + { + {"hash_count_distinct", nullptr}, + }, + use_threads)); + + { + // Order is not stable + EXPECT_EQ(4, aggregated_and_grouped.length()); + const int64_t* counts = + aggregated_and_grouped.array()->child_data[0]->GetValues(1); + const uint8_t* keys_valid = + aggregated_and_grouped.array()->child_data[1]->GetValues(0, 0); + const int64_t* keys = + aggregated_and_grouped.array()->child_data[1]->GetValues(1); + for (int i = 0; i < aggregated_and_grouped.length(); i++) { + if (BitUtil::GetBit(keys_valid, i)) { + EXPECT_EQ(keys[i], counts[i]); + } else { + EXPECT_EQ(4, counts[i]); + } + } + } + + table = + TableFromJSON(schema({field("argument", utf8()), field("key", int64())}), {R"([ + ["foo", 1], + ["foo", 1] +])", + R"([ + ["bar", 2], + [null, 3] +])", + R"([ + ["baz", null], + ["foo", 3] +])", + R"([ + ["bar", 2], + ["spam", 2] +])", + R"([ + ["eggs", null], + ["ham", 3] + ])", + R"([ + ["a", null], + ["b", null] + ])"}); + + ASSERT_OK_AND_ASSIGN(aggregated_and_grouped, + internal::GroupBy( + { + table->GetColumnByName("argument"), + }, + { + table->GetColumnByName("key"), + }, + { + {"hash_count_distinct", nullptr}, + }, + use_threads)); + + { + // Order is not stable + EXPECT_EQ(4, aggregated_and_grouped.length()); + const int64_t* counts = + aggregated_and_grouped.array()->child_data[0]->GetValues(1); + const uint8_t* keys_valid = + aggregated_and_grouped.array()->child_data[1]->GetValues(0, 0); + const int64_t* keys = + aggregated_and_grouped.array()->child_data[1]->GetValues(1); + for (int i = 0; i < aggregated_and_grouped.length(); i++) { + if (BitUtil::GetBit(keys_valid, i)) { + EXPECT_EQ(keys[i], counts[i]); + } else { + EXPECT_EQ(4, counts[i]); + } + } + } + } +} + TEST(GroupBy, CountAndSum) { auto batch = RecordBatchFromJSON( schema({field("argument", float64()), field("key", int64())}), R"([ From 53e807bbf1c09dde1a97271b43bfd26935f2f396 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 4 Aug 2021 13:44:31 -0400 Subject: [PATCH 02/11] ARROW-12728: [C++] Add distinct hash aggregate kernel --- .../arrow/compute/kernels/hash_aggregate.cc | 56 ++++++++++++ .../compute/kernels/hash_aggregate_test.cc | 85 +++++++++++++++++++ 2 files changed, 141 insertions(+) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index d1d7bef4c66..aebb5459cee 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -24,6 +24,7 @@ #include #include +#include "arrow/array/concatenate.h" #include "arrow/buffer_builder.h" #include "arrow/compute/api_aggregate.h" #include "arrow/compute/api_vector.h" @@ -2026,6 +2027,49 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { std::unique_ptr grouper_; }; +struct GroupedDistinctImpl : public GroupedCountDistinctImpl { + Result Finalize() override { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr offsets_buffer, + AllocateBuffer((num_groups_ + 1) * sizeof(int32_t), pool_)); + int32_t* offsets = reinterpret_cast(offsets_buffer->mutable_data()); + ARROW_ASSIGN_OR_RAISE(auto uniques, grouper_->GetUniques()); + + // Assemble the final list by concatenating slices + std::vector>> grouped_slices(num_groups_); + std::vector> all_slices; + all_slices.reserve(uniques.length); + + const auto values = uniques[0].make_array(); + auto* g = uniques[1].array()->GetValues(1); + for (int64_t i = 0; i < uniques.length; i++) { + grouped_slices[g[i]].push_back(values->Slice(i, /*length=*/1)); + } + offsets[0] = 0; + for (size_t i = 0; i < grouped_slices.size(); i++) { + all_slices.insert(all_slices.end(), + std::make_move_iterator(grouped_slices[i].begin()), + std::make_move_iterator(grouped_slices[i].end())); + offsets[i + 1] = static_cast(all_slices.size()); + } + ARROW_ASSIGN_OR_RAISE(auto child_array, Concatenate(all_slices, ctx_->memory_pool())); + + return ArrayData::Make(out_type(), num_groups_, {nullptr, std::move(offsets_buffer)}, + {child_array->data()}, + /*null_count=*/0); + } + + std::shared_ptr out_type() const override { return list(out_type_); } + + std::shared_ptr out_type_; +}; + +Result> GroupedDistinctInit(KernelContext* ctx, + const KernelInitArgs& args) { + ARROW_ASSIGN_OR_RAISE(auto impl, HashAggregateInit(ctx, args)); + static_cast(impl.get())->out_type_ = args.inputs[0].type; + return std::move(impl); +} + } // namespace Result> GetKernels( @@ -2358,6 +2402,11 @@ const FunctionDoc hash_count_distinct_doc{ "Count the distinct values in each group", ("Nulls are counted. NaNs and signed zeroes are not normalized."), {"array", "group_id_array"}}; + +const FunctionDoc hash_distinct_doc{ + "Keep the distinct values in each group", + ("Nulls are kept. NaNs and signed zeroes are not normalized."), + {"array", "group_id_array"}}; } // namespace void RegisterHashAggregateBasic(FunctionRegistry* registry) { @@ -2489,6 +2538,13 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) { MakeKernel(ValueDescr::ARRAY, HashAggregateInit))); DCHECK_OK(registry->AddFunction(std::move(func))); } + + { + auto func = std::make_shared("hash_distinct", Arity::Binary(), + &hash_distinct_doc); + DCHECK_OK(func->AddKernel(MakeKernel(ValueDescr::ARRAY, GroupedDistinctInit))); + DCHECK_OK(registry->AddFunction(std::move(func))); + } } } // namespace internal diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc index 8b89fa47491..93b9e9216ea 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -1359,6 +1359,91 @@ TEST(GroupBy, CountDistinct) { } } +TEST(GroupBy, Distinct) { + for (bool use_threads : {true, false}) { + SCOPED_TRACE(use_threads ? "parallel/merged" : "serial"); + + auto table = + TableFromJSON(schema({field("argument", utf8()), field("key", int64())}), {R"([ + ["foo", 1], + ["foo", 1] +])", + R"([ + ["bar", 2], + [null, 3] +])", + R"([ + ["baz", null], + ["foo", 3] +])", + R"([ + ["bar", 2], + ["spam", 2] +])", + R"([ + ["eggs", null], + ["ham", 3] + ])", + R"([ + ["a", null], + ["b", null] + ])"}); + + ASSERT_OK_AND_ASSIGN(auto aggregated_and_grouped, + internal::GroupBy( + { + table->GetColumnByName("argument"), + }, + { + table->GetColumnByName("key"), + }, + { + {"hash_distinct", nullptr}, + }, + use_threads)); + + { + // Order is not stable + EXPECT_EQ(4, aggregated_and_grouped.length()); + const auto uniques = std::static_pointer_cast( + aggregated_and_grouped.array_as()->GetFieldByName( + "hash_distinct")); + const uint8_t* keys_valid = + aggregated_and_grouped.array()->child_data[1]->GetValues(0, 0); + const int64_t* keys = + aggregated_and_grouped.array()->child_data[1]->GetValues(1); + for (int i = 0; i < aggregated_and_grouped.length(); i++) { + auto values = std::static_pointer_cast(uniques->value_slice(i)); + std::vector c_values; + for (int i = 0; i < values->length(); i++) { + if (values->IsValid(i)) { + c_values.push_back(values->GetView(i)); + } + } + + if (BitUtil::GetBit(keys_valid, i)) { + EXPECT_EQ(keys[i], values->length()); + if (keys[i] == 1) { + EXPECT_EQ(0, values->null_count()); + EXPECT_THAT(c_values, ::testing::UnorderedElementsAreArray({"foo"})); + } else if (keys[i] == 2) { + EXPECT_EQ(0, values->null_count()); + EXPECT_THAT(c_values, ::testing::UnorderedElementsAreArray({"bar", "spam"})); + } else if (keys[i] == 3) { + EXPECT_EQ(1, values->null_count()); + EXPECT_THAT(c_values, ::testing::UnorderedElementsAreArray({"foo", "ham"})); + } + } else { + EXPECT_EQ(4, values->length()); + EXPECT_EQ(0, values->null_count()); + EXPECT_THAT(c_values, + ::testing::UnorderedElementsAreArray({"a", "b", "baz", "eggs"})); + } + } + } + } +} + TEST(GroupBy, CountAndSum) { auto batch = RecordBatchFromJSON( schema({field("argument", float64()), field("key", int64())}), R"([ From b29f3786b8b8f2ae79cb14641acc0873ab886aee Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 4 Aug 2021 14:12:38 -0400 Subject: [PATCH 03/11] ARROW-12728: [C++] Fix type --- cpp/src/arrow/compute/kernels/hash_aggregate.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index aebb5459cee..461d95571c4 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -2023,7 +2023,7 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { ExecContext* ctx_; MemoryPool* pool_; - uint32_t num_groups_; + int64_t num_groups_; std::unique_ptr grouper_; }; From 9d0ceaf5a3318db59e6a661f07818dc4ce80163c Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 4 Aug 2021 16:25:39 -0400 Subject: [PATCH 04/11] ARROW-12728: [C++] Assemble results of distinct via Take --- .../arrow/compute/kernels/hash_aggregate.cc | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 461d95571c4..5d7675b63b0 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -24,7 +24,6 @@ #include #include -#include "arrow/array/concatenate.h" #include "arrow/buffer_builder.h" #include "arrow/compute/api_aggregate.h" #include "arrow/compute/api_vector.h" @@ -2029,32 +2028,40 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { struct GroupedDistinctImpl : public GroupedCountDistinctImpl { Result Finalize() override { + ARROW_ASSIGN_OR_RAISE(auto uniques, grouper_->GetUniques()); + + // Assemble the final list via Take + ARROW_ASSIGN_OR_RAISE(std::shared_ptr offsets_buffer, AllocateBuffer((num_groups_ + 1) * sizeof(int32_t), pool_)); int32_t* offsets = reinterpret_cast(offsets_buffer->mutable_data()); - ARROW_ASSIGN_OR_RAISE(auto uniques, grouper_->GetUniques()); - // Assemble the final list by concatenating slices - std::vector>> grouped_slices(num_groups_); - std::vector> all_slices; - all_slices.reserve(uniques.length); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr take_indices, + AllocateBuffer(uniques.length * sizeof(int64_t), pool_)); + int32_t* indices = reinterpret_cast(take_indices->mutable_data()); + + std::vector> grouped_slices(num_groups_); - const auto values = uniques[0].make_array(); auto* g = uniques[1].array()->GetValues(1); - for (int64_t i = 0; i < uniques.length; i++) { - grouped_slices[g[i]].push_back(values->Slice(i, /*length=*/1)); + for (int32_t i = 0; i < uniques.length; i++) { + grouped_slices[g[i]].push_back(i); } + offsets[0] = 0; for (size_t i = 0; i < grouped_slices.size(); i++) { - all_slices.insert(all_slices.end(), - std::make_move_iterator(grouped_slices[i].begin()), - std::make_move_iterator(grouped_slices[i].end())); - offsets[i + 1] = static_cast(all_slices.size()); + indices = std::copy(grouped_slices[i].begin(), grouped_slices[i].end(), indices); + offsets[i + 1] = offsets[i] + grouped_slices[i].size(); } - ARROW_ASSIGN_OR_RAISE(auto child_array, Concatenate(all_slices, ctx_->memory_pool())); + + ARROW_ASSIGN_OR_RAISE( + auto child_array, + Take(uniques[0], + Datum(ArrayData::Make(int32(), uniques.length, + {nullptr, std::move(take_indices)}, /*null_count=*/0)), + TakeOptions::NoBoundsCheck(), ctx_)); return ArrayData::Make(out_type(), num_groups_, {nullptr, std::move(offsets_buffer)}, - {child_array->data()}, + {child_array.array()}, /*null_count=*/0); } From 7403fdffc4596d1951c063e675529dac2b33e278 Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 5 Aug 2021 09:11:54 -0400 Subject: [PATCH 05/11] ARROW-12728: [C++] Add cast for MSVC --- cpp/src/arrow/compute/kernels/hash_aggregate.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 5d7675b63b0..14867eb9e93 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -2050,7 +2050,7 @@ struct GroupedDistinctImpl : public GroupedCountDistinctImpl { offsets[0] = 0; for (size_t i = 0; i < grouped_slices.size(); i++) { indices = std::copy(grouped_slices[i].begin(), grouped_slices[i].end(), indices); - offsets[i + 1] = offsets[i] + grouped_slices[i].size(); + offsets[i + 1] = offsets[i] + static_cast(grouped_slices[i].size()); } ARROW_ASSIGN_OR_RAISE( From 566bf0c7b3101582770ff907f6020d7c0b7ab384 Mon Sep 17 00:00:00 2001 From: David Li Date: Mon, 16 Aug 2021 09:47:06 -0400 Subject: [PATCH 06/11] ARROW-12728: [C++] Clean up tests --- .../compute/kernels/hash_aggregate_test.cc | 119 +++++++----------- 1 file changed, 46 insertions(+), 73 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc index 93b9e9216ea..df2222a4eef 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -1281,24 +1281,21 @@ TEST(GroupBy, CountDistinct) { {"hash_count_distinct", nullptr}, }, use_threads)); + SortBy({"key_0"}, &aggregated_and_grouped); + ValidateOutput(aggregated_and_grouped); - { - // Order is not stable - EXPECT_EQ(4, aggregated_and_grouped.length()); - const int64_t* counts = - aggregated_and_grouped.array()->child_data[0]->GetValues(1); - const uint8_t* keys_valid = - aggregated_and_grouped.array()->child_data[1]->GetValues(0, 0); - const int64_t* keys = - aggregated_and_grouped.array()->child_data[1]->GetValues(1); - for (int i = 0; i < aggregated_and_grouped.length(); i++) { - if (BitUtil::GetBit(keys_valid, i)) { - EXPECT_EQ(keys[i], counts[i]); - } else { - EXPECT_EQ(4, counts[i]); - } - } - } + AssertDatumsEqual(ArrayFromJSON(struct_({ + field("hash_count_distinct", int64()), + field("key_0", int64()), + }), + R"([ + [1, 1], + [2, 2], + [3, 3], + [4, null] + ])"), + aggregated_and_grouped, + /*verbose=*/true); table = TableFromJSON(schema({field("argument", utf8()), field("key", int64())}), {R"([ @@ -1338,24 +1335,21 @@ TEST(GroupBy, CountDistinct) { {"hash_count_distinct", nullptr}, }, use_threads)); + ValidateOutput(aggregated_and_grouped); + SortBy({"key_0"}, &aggregated_and_grouped); - { - // Order is not stable - EXPECT_EQ(4, aggregated_and_grouped.length()); - const int64_t* counts = - aggregated_and_grouped.array()->child_data[0]->GetValues(1); - const uint8_t* keys_valid = - aggregated_and_grouped.array()->child_data[1]->GetValues(0, 0); - const int64_t* keys = - aggregated_and_grouped.array()->child_data[1]->GetValues(1); - for (int i = 0; i < aggregated_and_grouped.length(); i++) { - if (BitUtil::GetBit(keys_valid, i)) { - EXPECT_EQ(keys[i], counts[i]); - } else { - EXPECT_EQ(4, counts[i]); - } - } - } + AssertDatumsEqual(ArrayFromJSON(struct_({ + field("hash_count_distinct", int64()), + field("key_0", int64()), + }), + R"([ + [1, 1], + [2, 2], + [3, 3], + [4, null] + ])"), + aggregated_and_grouped, + /*verbose=*/true); } } @@ -1401,46 +1395,25 @@ TEST(GroupBy, Distinct) { {"hash_distinct", nullptr}, }, use_threads)); + ValidateOutput(aggregated_and_grouped); + SortBy({"key_0"}, &aggregated_and_grouped); - { - // Order is not stable - EXPECT_EQ(4, aggregated_and_grouped.length()); - const auto uniques = std::static_pointer_cast( - aggregated_and_grouped.array_as()->GetFieldByName( - "hash_distinct")); - const uint8_t* keys_valid = - aggregated_and_grouped.array()->child_data[1]->GetValues(0, 0); - const int64_t* keys = - aggregated_and_grouped.array()->child_data[1]->GetValues(1); - for (int i = 0; i < aggregated_and_grouped.length(); i++) { - auto values = std::static_pointer_cast(uniques->value_slice(i)); - std::vector c_values; - for (int i = 0; i < values->length(); i++) { - if (values->IsValid(i)) { - c_values.push_back(values->GetView(i)); - } - } - - if (BitUtil::GetBit(keys_valid, i)) { - EXPECT_EQ(keys[i], values->length()); - if (keys[i] == 1) { - EXPECT_EQ(0, values->null_count()); - EXPECT_THAT(c_values, ::testing::UnorderedElementsAreArray({"foo"})); - } else if (keys[i] == 2) { - EXPECT_EQ(0, values->null_count()); - EXPECT_THAT(c_values, ::testing::UnorderedElementsAreArray({"bar", "spam"})); - } else if (keys[i] == 3) { - EXPECT_EQ(1, values->null_count()); - EXPECT_THAT(c_values, ::testing::UnorderedElementsAreArray({"foo", "ham"})); - } - } else { - EXPECT_EQ(4, values->length()); - EXPECT_EQ(0, values->null_count()); - EXPECT_THAT(c_values, - ::testing::UnorderedElementsAreArray({"a", "b", "baz", "eggs"})); - } - } - } + // Order of sub-arrays is not stable + auto struct_arr = aggregated_and_grouped.array_as(); + auto distinct_arr = checked_pointer_cast(struct_arr->field(0)); + auto sort = [](const Array& arr) -> std::shared_ptr { + EXPECT_OK_AND_ASSIGN(auto indices, SortIndices(arr)); + EXPECT_OK_AND_ASSIGN(auto sorted, Take(arr, indices)); + return sorted.make_array(); + }; + AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["foo"])"), + sort(*distinct_arr->value_slice(0)), /*verbose=*/true); + AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["bar", "spam"])"), + sort(*distinct_arr->value_slice(1)), /*verbose=*/true); + AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["foo", "ham", null])"), + sort(*distinct_arr->value_slice(2)), /*verbose=*/true); + AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["a", "b", "baz", "eggs"])"), + sort(*distinct_arr->value_slice(3)), /*verbose=*/true); } } From 35e30c8b8431e87d2b368a3f3d263684b937f21e Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 19 Aug 2021 16:50:16 -0400 Subject: [PATCH 07/11] ARROW-12728: [C++] Reuse MakeGroupings/ApplyGroupings --- .../arrow/compute/kernels/hash_aggregate.cc | 36 ++----------------- 1 file changed, 3 insertions(+), 33 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 14867eb9e93..b4526da96bd 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -2029,40 +2029,10 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { struct GroupedDistinctImpl : public GroupedCountDistinctImpl { Result Finalize() override { ARROW_ASSIGN_OR_RAISE(auto uniques, grouper_->GetUniques()); - - // Assemble the final list via Take - - ARROW_ASSIGN_OR_RAISE(std::shared_ptr offsets_buffer, - AllocateBuffer((num_groups_ + 1) * sizeof(int32_t), pool_)); - int32_t* offsets = reinterpret_cast(offsets_buffer->mutable_data()); - - ARROW_ASSIGN_OR_RAISE(std::shared_ptr take_indices, - AllocateBuffer(uniques.length * sizeof(int64_t), pool_)); - int32_t* indices = reinterpret_cast(take_indices->mutable_data()); - - std::vector> grouped_slices(num_groups_); - - auto* g = uniques[1].array()->GetValues(1); - for (int32_t i = 0; i < uniques.length; i++) { - grouped_slices[g[i]].push_back(i); - } - - offsets[0] = 0; - for (size_t i = 0; i < grouped_slices.size(); i++) { - indices = std::copy(grouped_slices[i].begin(), grouped_slices[i].end(), indices); - offsets[i + 1] = offsets[i] + static_cast(grouped_slices[i].size()); - } - ARROW_ASSIGN_OR_RAISE( - auto child_array, - Take(uniques[0], - Datum(ArrayData::Make(int32(), uniques.length, - {nullptr, std::move(take_indices)}, /*null_count=*/0)), - TakeOptions::NoBoundsCheck(), ctx_)); - - return ArrayData::Make(out_type(), num_groups_, {nullptr, std::move(offsets_buffer)}, - {child_array.array()}, - /*null_count=*/0); + auto groupings, + grouper_->MakeGroupings(*uniques[1].array_as(), num_groups_, ctx_)); + return grouper_->ApplyGroupings(*groupings, *uniques[0].make_array(), ctx_); } std::shared_ptr out_type() const override { return list(out_type_); } From 991c3393a4294ebd06d1ef939b4efc352e5d5c63 Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 19 Aug 2021 16:53:53 -0400 Subject: [PATCH 08/11] ARROW-12728: [C++] Eagerly initialize grouper --- .../arrow/compute/kernels/hash_aggregate.cc | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index b4526da96bd..5ca26ec9637 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -1979,9 +1979,6 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { } Status Consume(const ExecBatch& batch) override { - if (!grouper_) { - ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(batch.GetDescriptors(), ctx_)); - } return grouper_->Consume(batch).status(); } @@ -2024,6 +2021,7 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { MemoryPool* pool_; int64_t num_groups_; std::unique_ptr grouper_; + std::shared_ptr out_type_; }; struct GroupedDistinctImpl : public GroupedCountDistinctImpl { @@ -2036,14 +2034,16 @@ struct GroupedDistinctImpl : public GroupedCountDistinctImpl { } std::shared_ptr out_type() const override { return list(out_type_); } - - std::shared_ptr out_type_; }; +template Result> GroupedDistinctInit(KernelContext* ctx, const KernelInitArgs& args) { - ARROW_ASSIGN_OR_RAISE(auto impl, HashAggregateInit(ctx, args)); - static_cast(impl.get())->out_type_ = args.inputs[0].type; + ARROW_ASSIGN_OR_RAISE(auto impl, HashAggregateInit(ctx, args)); + auto instance = static_cast(impl.get()); + instance->out_type_ = args.inputs[0].type; + ARROW_ASSIGN_OR_RAISE(instance->grouper_, + Grouper::Make(args.inputs, ctx->exec_context())); return std::move(impl); } @@ -2512,14 +2512,15 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) { auto func = std::make_shared( "hash_count_distinct", Arity::Binary(), &hash_count_distinct_doc); DCHECK_OK(func->AddKernel( - MakeKernel(ValueDescr::ARRAY, HashAggregateInit))); + MakeKernel(ValueDescr::ARRAY, GroupedDistinctInit))); DCHECK_OK(registry->AddFunction(std::move(func))); } { auto func = std::make_shared("hash_distinct", Arity::Binary(), &hash_distinct_doc); - DCHECK_OK(func->AddKernel(MakeKernel(ValueDescr::ARRAY, GroupedDistinctInit))); + DCHECK_OK(func->AddKernel( + MakeKernel(ValueDescr::ARRAY, GroupedDistinctInit))); DCHECK_OK(registry->AddFunction(std::move(func))); } } From f5f4397010934dd9ee8766b5610217db7bb481a6 Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 19 Aug 2021 17:38:52 -0400 Subject: [PATCH 09/11] ARROW-12728: [C++] Add cast for MSVC --- cpp/src/arrow/compute/kernels/hash_aggregate.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 5ca26ec9637..2df3c73c1b9 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -2027,9 +2027,9 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { struct GroupedDistinctImpl : public GroupedCountDistinctImpl { Result Finalize() override { ARROW_ASSIGN_OR_RAISE(auto uniques, grouper_->GetUniques()); - ARROW_ASSIGN_OR_RAISE( - auto groupings, - grouper_->MakeGroupings(*uniques[1].array_as(), num_groups_, ctx_)); + ARROW_ASSIGN_OR_RAISE(auto groupings, grouper_->MakeGroupings( + *uniques[1].array_as(), + static_cast(num_groups_), ctx_)); return grouper_->ApplyGroupings(*groupings, *uniques[0].make_array(), ctx_); } From bf9ce07f18a6425aa8e666aaf16ed5a1dc11e019 Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 20 Aug 2021 14:53:37 -0400 Subject: [PATCH 10/11] ARROW-12728: [C++] Allocate new array for remapped group IDs --- cpp/src/arrow/compute/kernels/hash_aggregate.cc | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 2df3c73c1b9..d569631f05e 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -1989,12 +1989,18 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { // Get (group_id, value) pairs, then translate the group IDs and consume them // ourselves ARROW_ASSIGN_OR_RAISE(auto uniques, other->grouper_->GetUniques()); + ARROW_ASSIGN_OR_RAISE(auto remapped_g, + AllocateBuffer(uniques.length * sizeof(uint32_t), pool_)); const auto* g_mapping = group_id_mapping.GetValues(1); - auto* other_g = uniques[1].array()->GetMutableValues(1); + const auto* other_g = uniques[1].array()->GetValues(1); + auto* g = reinterpret_cast(remapped_g->mutable_data()); + for (int64_t i = 0; i < uniques.length; i++) { - other_g[i] = g_mapping[other_g[i]]; + g[i] = g_mapping[other_g[i]]; } + uniques.values[1] = + ArrayData::Make(uint32(), uniques.length, {nullptr, std::move(remapped_g)}); return Consume(std::move(uniques)); } From 785bac76d75cd4580397a717c2a13cb686ceebe5 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 25 Aug 2021 12:18:06 -0400 Subject: [PATCH 11/11] ARROW-12728: [C++] Fix comment wording --- cpp/src/arrow/compute/kernels/hash_aggregate.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index d569631f05e..0a567e385e7 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -1986,7 +1986,7 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { const ArrayData& group_id_mapping) override { auto other = checked_cast(&raw_other); - // Get (group_id, value) pairs, then translate the group IDs and consume them + // Get (value, group_id) pairs, then translate the group IDs and consume them // ourselves ARROW_ASSIGN_OR_RAISE(auto uniques, other->grouper_->GetUniques()); ARROW_ASSIGN_OR_RAISE(auto remapped_g,