From a935983456f304240370db249f887c31d2c89484 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Thu, 6 Apr 2023 08:45:11 -0400 Subject: [PATCH] GH-34930: [C++] Standardize aggregation column order --- cpp/src/arrow/acero/aggregate_node.cc | 25 +++++++------- cpp/src/arrow/acero/hash_aggregate_test.cc | 35 ++++++++++---------- cpp/src/arrow/acero/plan_test.cc | 4 +-- cpp/src/arrow/engine/substrait/serde_test.cc | 2 +- 4 files changed, 33 insertions(+), 33 deletions(-) diff --git a/cpp/src/arrow/acero/aggregate_node.cc b/cpp/src/arrow/acero/aggregate_node.cc index bd97235df65..0fb429d8234 100644 --- a/cpp/src/arrow/acero/aggregate_node.cc +++ b/cpp/src/arrow/acero/aggregate_node.cc @@ -651,19 +651,19 @@ class GroupByNode : public ExecNode, public TracedNode { // Build field vector for output schema FieldVector output_fields{keys.size() + segment_keys.size() + aggs.size()}; - // First output is keys, followed by segment_keys, followed by aggregates themselves + // First output is segment keys, followed by keys, followed by aggregates themselves // This matches the behavior described by Substrait and also tends to be the behavior // in SQL engines - for (size_t i = 0; i < keys.size(); ++i) { - int key_field_id = key_field_ids[i]; - output_fields[i] = input_schema->field(key_field_id); - } - size_t base = keys.size(); for (size_t i = 0; i < segment_keys.size(); ++i) { int segment_key_field_id = segment_key_field_ids[i]; - output_fields[base + i] = input_schema->field(segment_key_field_id); + output_fields[i] = input_schema->field(segment_key_field_id); + } + size_t base = segment_keys.size(); + for (size_t i = 0; i < keys.size(); ++i) { + int key_field_id = key_field_ids[i]; + output_fields[base + i] = input_schema->field(key_field_id); } - base += segment_keys.size(); + base += keys.size(); for (size_t i = 0; i < aggs.size(); ++i) { output_fields[base + i] = agg_result_fields[i]->WithName(aggregate_options.aggregates[i].name); @@ -781,11 +781,12 @@ class GroupByNode : public ExecNode, public TracedNode { out_data.values.resize(agg_kernels_.size() + key_field_ids_.size() + segment_key_field_ids_.size()); - // Keys come first - ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, state->grouper->GetUniques()); - std::move(out_keys.values.begin(), out_keys.values.end(), out_data.values.begin()); + // Segment keys come first + PlaceFields(out_data, 0, segmenter_values_); // Followed by segment keys - PlaceFields(out_data, key_field_ids_.size(), segmenter_values_); + ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, state->grouper->GetUniques()); + std::move(out_keys.values.begin(), out_keys.values.end(), + out_data.values.begin() + segment_key_field_ids_.size()); // And finally, the aggregates themselves std::size_t base = segment_key_field_ids_.size() + key_field_ids_.size(); for (size_t i = 0; i < agg_kernels_.size(); ++i) { diff --git a/cpp/src/arrow/acero/hash_aggregate_test.cc b/cpp/src/arrow/acero/hash_aggregate_test.cc index 0ae06d05728..2fe59b8bfa3 100644 --- a/cpp/src/arrow/acero/hash_aggregate_test.cc +++ b/cpp/src/arrow/acero/hash_aggregate_test.cc @@ -4703,8 +4703,8 @@ void TestSegmentKey(GroupByFunction group_by, const std::shared_ptr& tabl } Result> GetSingleSegmentInputAsChunked() { - auto table = TableFromJSON(schema({field("argument", float64()), field("key", int64()), - field("segment_key", int64())}), + auto table = TableFromJSON(schema({field("segment_key", int64()), field("key", int64()), + field("argument", float64())}), {R"([{"argument": 1.0, "key": 1, "segment_key": 1}, {"argument": null, "key": 1, "segment_key": 1} ])", @@ -4757,8 +4757,8 @@ Result> GetSingleSegmentScalarOutput() { Result> GetSingleSegmentKeyOutput() { return ChunkedArrayFromJSON(struct_({ - field("key_0", int64()), field("key_1", int64()), + field("key_0", int64()), field("hash_count", int64()), field("hash_sum", float64()), field("hash_min_max", struct_({ @@ -4767,16 +4767,16 @@ Result> GetSingleSegmentKeyOutput() { })), }), {R"([ - [1, 1, 2, 4.25, {"min": 1.0, "max": 3.25} ], - [2, 1, 3, -0.125, {"min": -0.25, "max": 0.125}], - [3, 1, 0, null, {"min": null, "max": null} ], - [null, 1, 2, 4.75, {"min": 0.75, "max": 4.0} ] + [1, 1, 2, 4.25, {"min": 1.0, "max": 3.25} ], + [1, 2, 3, -0.125, {"min": -0.25, "max": 0.125}], + [1, 3, 0, null, {"min": null, "max": null} ], + [1, null, 2, 4.75, {"min": 0.75, "max": 4.0} ] ])", R"([ - [1, 0, 2, 4.25, {"min": 1.0, "max": 3.25} ], - [2, 0, 3, -0.125, {"min": -0.25, "max": 0.125}], - [3, 0, 0, null, {"min": null, "max": null} ], - [null, 0, 2, 4.75, {"min": 0.75, "max": 4.0} ] + [0, 1, 2, 4.25, {"min": 1.0, "max": 3.25} ], + [0, 2, 3, -0.125, {"min": -0.25, "max": 0.125}], + [0, 3, 0, null, {"min": null, "max": null} ], + [0, null, 2, 4.75, {"min": 0.75, "max": 4.0} ] ])"}); } @@ -4833,7 +4833,7 @@ Result> GetEmptySegmentKeysInputAsCombined() { Result> GetEmptySegmentKeyOutput() { ARROW_ASSIGN_OR_RAISE(auto chunked, GetSingleSegmentKeyOutput()); ARROW_ASSIGN_OR_RAISE(auto table, Table::FromChunkedStructArray(chunked)); - ARROW_ASSIGN_OR_RAISE(auto removed, table->RemoveColumn(1)); + ARROW_ASSIGN_OR_RAISE(auto removed, table->RemoveColumn(0)); auto sliced = removed->Slice(0, 4); ARROW_ASSIGN_OR_RAISE(auto batch, sliced->CombineChunksToBatch()); return batch->ToStructArray(); @@ -4854,14 +4854,13 @@ TEST_P(SegmentedKeyGroupBy, EmptySegmentKeyCombined) { TestEmptySegmentKey(GetParam(), GetEmptySegmentKeysInputAsCombined); } -// adds a named copy of the last (single-segment-key) column to the obtained table +// adds a named copy of the first (single-segment-key) column to the obtained table Result> GetMultiSegmentInput( std::function>()> get_table, const std::string& add_name) { ARROW_ASSIGN_OR_RAISE(auto table, get_table()); - int last = table->num_columns() - 1; - auto add_field = field(add_name, table->schema()->field(last)->type()); - return table->AddColumn(table->num_columns(), add_field, table->column(last)); + auto add_field = field(add_name, table->schema()->field(0)->type()); + return table->AddColumn(table->num_columns(), add_field, table->column(0)); } Result> GetMultiSegmentInputAsChunked( @@ -4874,12 +4873,12 @@ Result> GetMultiSegmentInputAsCombined( return GetMultiSegmentInput(GetSingleSegmentInputAsCombined, add_name); } -// adds a named copy of the last (single-segment-key) column to the expected output table +// adds a named copy of the first(single-segment-key) column to the expected output table Result> GetMultiSegmentKeyOutput( const std::string& add_name) { ARROW_ASSIGN_OR_RAISE(auto chunked, GetSingleSegmentKeyOutput()); ARROW_ASSIGN_OR_RAISE(auto table, Table::FromChunkedStructArray(chunked)); - int existing_key_field_idx = 1; + int existing_key_field_idx = 0; auto add_field = field(add_name, table->schema()->field(existing_key_field_idx)->type()); ARROW_ASSIGN_OR_RAISE(auto added, diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc index 3ce2ba2b8cc..d6e18c3bb71 100644 --- a/cpp/src/arrow/acero/plan_test.cc +++ b/cpp/src/arrow/acero/plan_test.cc @@ -1640,7 +1640,7 @@ TEST(ExecPlanExecution, SegmentedAggregationWithOneSegment) { DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); auto expected = ExecBatchFromJSON({int32(), int32(), int64(), float64()}, - R"([[1, 1, 6, 2], [2, 1, 6, 2]])"); + R"([[1, 1, 6, 2], [1, 2, 6, 2]])"); AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches, {expected}); } @@ -1670,7 +1670,7 @@ TEST(ExecPlanExecution, SegmentedAggregationWithTwoSegments) { auto expected = ExecBatchFromJSON( {int32(), int32(), int64(), float64()}, - R"([[1, 1, 3, 1.5], [2, 1, 1, 1], [1, 2, 3, 3], [2, 2, 5, 2.5]])"); + R"([[1, 1, 3, 1.5], [1, 2, 1, 1], [2, 1, 3, 3], [2, 2, 5, 2.5]])"); AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches, {expected}); } diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc index ca96b2bf7c8..b84bde8080e 100644 --- a/cpp/src/arrow/engine/substrait/serde_test.cc +++ b/cpp/src/arrow/engine/substrait/serde_test.cc @@ -5726,7 +5726,7 @@ TEST(Substrait, PlanWithSegmentedAggregateExtension) { std::shared_ptr output_schema = schema({field("k", int32()), field("t", int32()), field("v", float64())}); auto expected_table = - TableFromJSON(output_schema, {"[[1, 1, 4], [2, 1, 2], [2, 2, 10], [1, 2, 5]]"}); + TableFromJSON(output_schema, {"[[1, 1, 4], [1, 2, 2], [2, 2, 10], [2, 1, 5]]"}); CheckRoundTripResult(std::move(expected_table), buf, {}, conversion_options); }