Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions cpp/src/arrow/acero/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -651,19 +651,19 @@ class GroupByNode : public ExecNode, public TracedNode {
// Build field vector for output schema
FieldVector output_fields{keys.size() + segment_keys.size() + aggs.size()};

// First output is keys, followed by segment_keys, followed by aggregates themselves
// First output is segment keys, followed by keys, followed by aggregates themselves
// This matches the behavior described by Substrait and also tends to be the behavior
// in SQL engines
for (size_t i = 0; i < keys.size(); ++i) {
int key_field_id = key_field_ids[i];
output_fields[i] = input_schema->field(key_field_id);
}
size_t base = keys.size();
for (size_t i = 0; i < segment_keys.size(); ++i) {
int segment_key_field_id = segment_key_field_ids[i];
output_fields[base + i] = input_schema->field(segment_key_field_id);
output_fields[i] = input_schema->field(segment_key_field_id);
}
size_t base = segment_keys.size();
for (size_t i = 0; i < keys.size(); ++i) {
int key_field_id = key_field_ids[i];
output_fields[base + i] = input_schema->field(key_field_id);
}
base += segment_keys.size();
base += keys.size();
for (size_t i = 0; i < aggs.size(); ++i) {
output_fields[base + i] =
agg_result_fields[i]->WithName(aggregate_options.aggregates[i].name);
Expand Down Expand Up @@ -781,11 +781,12 @@ class GroupByNode : public ExecNode, public TracedNode {
out_data.values.resize(agg_kernels_.size() + key_field_ids_.size() +
segment_key_field_ids_.size());

// Keys come first
ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, state->grouper->GetUniques());
std::move(out_keys.values.begin(), out_keys.values.end(), out_data.values.begin());
// Segment keys come first
PlaceFields(out_data, 0, segmenter_values_);
// Followed by segment keys
PlaceFields(out_data, key_field_ids_.size(), segmenter_values_);
ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, state->grouper->GetUniques());
std::move(out_keys.values.begin(), out_keys.values.end(),
out_data.values.begin() + segment_key_field_ids_.size());
// And finally, the aggregates themselves
std::size_t base = segment_key_field_ids_.size() + key_field_ids_.size();
for (size_t i = 0; i < agg_kernels_.size(); ++i) {
Expand Down
35 changes: 17 additions & 18 deletions cpp/src/arrow/acero/hash_aggregate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4703,8 +4703,8 @@ void TestSegmentKey(GroupByFunction group_by, const std::shared_ptr<Table>& tabl
}

Result<std::shared_ptr<Table>> GetSingleSegmentInputAsChunked() {
auto table = TableFromJSON(schema({field("argument", float64()), field("key", int64()),
field("segment_key", int64())}),
auto table = TableFromJSON(schema({field("segment_key", int64()), field("key", int64()),
field("argument", float64())}),
{R"([{"argument": 1.0, "key": 1, "segment_key": 1},
{"argument": null, "key": 1, "segment_key": 1}
])",
Expand Down Expand Up @@ -4757,8 +4757,8 @@ Result<std::shared_ptr<ChunkedArray>> GetSingleSegmentScalarOutput() {

Result<std::shared_ptr<ChunkedArray>> GetSingleSegmentKeyOutput() {
return ChunkedArrayFromJSON(struct_({
field("key_0", int64()),
field("key_1", int64()),
field("key_0", int64()),
field("hash_count", int64()),
field("hash_sum", float64()),
field("hash_min_max", struct_({
Expand All @@ -4767,16 +4767,16 @@ Result<std::shared_ptr<ChunkedArray>> GetSingleSegmentKeyOutput() {
})),
}),
{R"([
[1, 1, 2, 4.25, {"min": 1.0, "max": 3.25} ],
[2, 1, 3, -0.125, {"min": -0.25, "max": 0.125}],
[3, 1, 0, null, {"min": null, "max": null} ],
[null, 1, 2, 4.75, {"min": 0.75, "max": 4.0} ]
[1, 1, 2, 4.25, {"min": 1.0, "max": 3.25} ],
[1, 2, 3, -0.125, {"min": -0.25, "max": 0.125}],
[1, 3, 0, null, {"min": null, "max": null} ],
[1, null, 2, 4.75, {"min": 0.75, "max": 4.0} ]
])",
R"([
[1, 0, 2, 4.25, {"min": 1.0, "max": 3.25} ],
[2, 0, 3, -0.125, {"min": -0.25, "max": 0.125}],
[3, 0, 0, null, {"min": null, "max": null} ],
[null, 0, 2, 4.75, {"min": 0.75, "max": 4.0} ]
[0, 1, 2, 4.25, {"min": 1.0, "max": 3.25} ],
[0, 2, 3, -0.125, {"min": -0.25, "max": 0.125}],
[0, 3, 0, null, {"min": null, "max": null} ],
[0, null, 2, 4.75, {"min": 0.75, "max": 4.0} ]
])"});
}

Expand Down Expand Up @@ -4833,7 +4833,7 @@ Result<std::shared_ptr<Table>> GetEmptySegmentKeysInputAsCombined() {
Result<std::shared_ptr<Array>> GetEmptySegmentKeyOutput() {
ARROW_ASSIGN_OR_RAISE(auto chunked, GetSingleSegmentKeyOutput());
ARROW_ASSIGN_OR_RAISE(auto table, Table::FromChunkedStructArray(chunked));
ARROW_ASSIGN_OR_RAISE(auto removed, table->RemoveColumn(1));
ARROW_ASSIGN_OR_RAISE(auto removed, table->RemoveColumn(0));
auto sliced = removed->Slice(0, 4);
ARROW_ASSIGN_OR_RAISE(auto batch, sliced->CombineChunksToBatch());
return batch->ToStructArray();
Expand All @@ -4854,14 +4854,13 @@ TEST_P(SegmentedKeyGroupBy, EmptySegmentKeyCombined) {
TestEmptySegmentKey(GetParam(), GetEmptySegmentKeysInputAsCombined);
}

// adds a named copy of the last (single-segment-key) column to the obtained table
// adds a named copy of the first (single-segment-key) column to the obtained table
Result<std::shared_ptr<Table>> GetMultiSegmentInput(
std::function<Result<std::shared_ptr<Table>>()> get_table,
const std::string& add_name) {
ARROW_ASSIGN_OR_RAISE(auto table, get_table());
int last = table->num_columns() - 1;
auto add_field = field(add_name, table->schema()->field(last)->type());
return table->AddColumn(table->num_columns(), add_field, table->column(last));
auto add_field = field(add_name, table->schema()->field(0)->type());
return table->AddColumn(table->num_columns(), add_field, table->column(0));
}

Result<std::shared_ptr<Table>> GetMultiSegmentInputAsChunked(
Expand All @@ -4874,12 +4873,12 @@ Result<std::shared_ptr<Table>> GetMultiSegmentInputAsCombined(
return GetMultiSegmentInput(GetSingleSegmentInputAsCombined, add_name);
}

// adds a named copy of the last (single-segment-key) column to the expected output table
// adds a named copy of the first(single-segment-key) column to the expected output table
Result<std::shared_ptr<ChunkedArray>> GetMultiSegmentKeyOutput(
const std::string& add_name) {
ARROW_ASSIGN_OR_RAISE(auto chunked, GetSingleSegmentKeyOutput());
ARROW_ASSIGN_OR_RAISE(auto table, Table::FromChunkedStructArray(chunked));
int existing_key_field_idx = 1;
int existing_key_field_idx = 0;
auto add_field =
field(add_name, table->schema()->field(existing_key_field_idx)->type());
ARROW_ASSIGN_OR_RAISE(auto added,
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/acero/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1640,7 +1640,7 @@ TEST(ExecPlanExecution, SegmentedAggregationWithOneSegment) {
DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));

auto expected = ExecBatchFromJSON({int32(), int32(), int64(), float64()},
R"([[1, 1, 6, 2], [2, 1, 6, 2]])");
R"([[1, 1, 6, 2], [1, 2, 6, 2]])");
AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
{expected});
}
Expand Down Expand Up @@ -1670,7 +1670,7 @@ TEST(ExecPlanExecution, SegmentedAggregationWithTwoSegments) {

auto expected = ExecBatchFromJSON(
{int32(), int32(), int64(), float64()},
R"([[1, 1, 3, 1.5], [2, 1, 1, 1], [1, 2, 3, 3], [2, 2, 5, 2.5]])");
R"([[1, 1, 3, 1.5], [1, 2, 1, 1], [2, 1, 3, 3], [2, 2, 5, 2.5]])");
AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
{expected});
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/engine/substrait/serde_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5726,7 +5726,7 @@ TEST(Substrait, PlanWithSegmentedAggregateExtension) {
std::shared_ptr<Schema> output_schema =
schema({field("k", int32()), field("t", int32()), field("v", float64())});
auto expected_table =
TableFromJSON(output_schema, {"[[1, 1, 4], [2, 1, 2], [2, 2, 10], [1, 2, 5]]"});
TableFromJSON(output_schema, {"[[1, 1, 4], [1, 2, 2], [2, 2, 10], [2, 1, 5]]"});
CheckRoundTripResult(std::move(expected_table), buf, {}, conversion_options);
}

Expand Down