-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-34626: [C++] Add ordered/segmented aggregation Substrait extension #34627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @icexelloss |
|
Marking as a draft until testing is added. |
|
@westonpace, when do you think you could review the latest commit? |
| #include "substrait/algebra.pb.h" // IWYU pragma: export | ||
|
|
||
| namespace arrow { | ||
| namespace compute { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a forward declaration of Aggregate for the signature of ParseAggregateMeasure. However, we no longer need AggregateNodeOptions, so I'll remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need forward declaration instead of just importing it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is conventional in Arrow to include a type declaration when its definitions is not needed, e.g., when only a pointer to the type is needed. You can see examples of this in "*_fwd.h" header files. I believe the basic idea of this is to minimize dependencies between header files. Having said that, I recently changed relation_internal.h here in a way that requires including the type definition and so I removed the declaration and included the defining header.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I believe if we don't expose these two methods from relation_internal.h then this we don't need this fwd declaration either. See #34627 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see now how we can cleanly avoid exposing the two functions in the "internal"-namespace without putting the function calling them in an inappropriate place. If we can do this cleanly, then I agree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sth like this - e958b6d
I moved all things under relation_internal.cc which I think makes the header file cleaner.
The downside is that the extension logic is now inside relation_internal.cc which makes that file larger, but I still think this is justifiable because "aggregate" and "segment aggregate" does share internal logic here. This is cleaner than exposing the ParseAggregateMeasure and MakeAggregateDeclaration in relation_internal.h.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand what you're trying to achieve, however, I think this refactoring of options.cc should be out of scope for this PR and in any case should probably be into new files rather than into relation_internal.cc, which can't be the right place.
| ARROW_ENGINE_EXPORT | ||
| Result<compute::Expression> FromProto(const substrait::Expression::ReferenceSegment*, | ||
| const ExtensionSet&, const ConversionOptions&, | ||
| std::optional<compute::Expression>); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the expression here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the "current expression". This method is currently called int he middle of a deserializing an expression tree. So, for example:
flowchart TD
A[Call] -->|args| FieldRef
A -->|args| C
C[Call*] -->|args| D
C -->|Call| E
D[Literal]
E[FieldRef*]
So, when de-referencing FieldRef this will be Call and when dereferencing FieldRef* this will be Call*.
However, can we remove this prototype from the header file and put it in an anonymous namespace? I think it should be an internal method and not exposed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DirectReferenceFromProto is used by MakeSegmentedAggregateRel in options.cc, so I think it has to be in a header. I wouldn't say it's a public API, because it's in expression_internal.h.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Weston for the explanation.
+1 on not exposing this in the header file. I think if we move the MakeSegmentedAggregateRel to expression_internal.cc then we don't need to expose this via the header file. I have a comment about moving these "make extension rel" methods out of options.cc anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think expression_internal.cc, which deals with Substrait expression parsing, is the right place for any of the Make*Rel methods, which deal with Substrait extension handling.
IMO, refactoring of options.cc could be reasonable but is outside the scope of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, can we remove this prototype from the header file and put it in an anonymous namespace? I think it should be an internal method and not exposed.
@westonpace Would sth like this look better to you? e958b6d
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@westonpace Would sth like this look better to you? e958b6d
I'm neutral on the change and don't want to spend too much time bike shedding here. If you're comfortable let's move forward.
DirectReferenceFromProto is used by MakeSegmentedAggregateRel in options.cc, so I think it has to be in a header.
Yes, I am fine with DirectReferenceFromProto. I was merely asking about the FromProto variant that takes a reference segment.
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took another look and caught something I had missed previously in the output schema.
| ARROW_ENGINE_EXPORT | ||
| Result<compute::Expression> FromProto(const substrait::Expression::ReferenceSegment*, | ||
| const ExtensionSet&, const ConversionOptions&, | ||
| std::optional<compute::Expression>); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the "current expression". This method is currently called int he middle of a deserializing an expression tree. So, for example:
flowchart TD
A[Call] -->|args| FieldRef
A -->|args| C
C[Call*] -->|args| D
C -->|Call| E
D[Literal]
E[FieldRef*]
So, when de-referencing FieldRef this will be Call and when dereferencing FieldRef* this will be Call*.
However, can we remove this prototype from the header file and put it in an anonymous namespace? I think it should be an internal method and not exposed.
| for (const auto& agg_src_fieldset : agg_src_fieldsets) { | ||
| for (int field : agg_src_fieldset) { | ||
| output_fields.emplace_back(input_schema->field(field)); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is correct.
- Values come after keys (note, this changed recently in [C++] Change Acero Aggregate output order #32897)
- You're inserting fields here for the inputs to the aggregates. However, the fields should be based on the outputs.
For example, consider the query is SELECT SUM(x), COVARIANCE(x,y) GROUP BY key and the input schema is {key: int32, x: float32, y: float32}
I believe this method would return {x: float32, x: float32, y: float32, key: int32}. Instead it should return { key: int32, sum(x): float64, covariance(x, y): float64}.
Note, the names aren't really important (and probably difficult to recreate). So it'd be fine to do {key0: int32, measure0: float64, measure1: float64}. However, we do need to get the types and # of output fields correct or else it will mess up any project relations added after this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically refactoring of original code (followed by adding similar code for segment_key_field_ids). Does that mean the original code is incorrect too? That would be surprising.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@westonpace I agree this looks strange/incorrect but it does seem to be same as existing code:
https://github.com/apache/arrow/blob/main/cpp/src/arrow/engine/substrait/relation_internal.cc#L771
Do you prefer try to get to bottom of this in this PR or leave as follow up? I am fine either way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can leave for a follow-up. Yes, I still think it's incorrect. Following up an aggregate with a projection had been broken for a long time due to the ordering issue. So it wouldn't surprise me if this has always been broken.
| /// \param[in] is_hash whether the measure is a hash one (i.e., aggregation keys exist) | ||
| /// \param[out] aggregates points to vector to push the parsed measure into | ||
| /// \param[out] agg_src_fieldsets points to vector to push the parsed field set into | ||
| ARROW_ENGINE_EXPORT Status ParseAggregateMeasure( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love these methods. It's not clear why they are public and they have a lot of arguments. However, I can understand why they are here.
ParseAggregateMeasure is probably inevitable. Although, I wonder if it might be easier to understand if you return:
struct ParsedMeasure {
compute::Aggregate aggregate;
std::vector<int> fieldset;
std::shared_ptr<DataType> output_type;
};
Result<ParsedMeasure> ParseAggregateMeasure(...);
I think we could do away with MakeAggregateDeclaration if we had a better way of computing the output schema given an input schema and a compute::AggregateNodeOptions. I started trying to make a compute::AggregateNodeOptions::CalculateOutputSchema(const Schema& input_schema) but it turns out it's rather tricky to determine the output types of measures once we have left Substrait. So, for the moment, I think we can leave this as-is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps something like the example of AsofJoinNode::MakeOutputSchema is what you're looking for? Though I'd strongly prefer to defer this refactoring to another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we need to keep these methods as well, I think we can make them internal methods of relation_internal.cc and remove them from the header.
|
@westonpace I believe the two main remaining items that block PR moving forward are: (1) https://github.com/apache/arrow/pull/34627/files#r1151191726 Your comment here about not exposing these methods in the header. I agree with this and made an attempt here: I am able to move those methods internal to I feel this is a bit cleaner than what is currently in the PR. If you are OK with this I can update the PR with the change. If you want some other way to resolve this / or leave it as is in the PR let me know as well. (2) #34627 (comment) This is potentially a bug / correctness issue. Given this is not introduced in this PR I am leaning towards debug this as follow up (I actually suspect this could cause some internal issues and asked @rtpsw to test but I also don't want to drag this PR for too long). In any case, I would like to move forward with this PR so please let me know your preference of the two issues above or any other issue that I missed. |
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@icexelloss I agree with your summary.
The header / inclusion issue is minor and we don't need to fix it if there is no obvious solution. I do think the schema thing is a problem but, given it was already broken, fixing it in a future PR seems fine to me. We can focus on adding some test cases. I'll create a github issue.
|
Thanks @westonpace. In that case I will go ahead and merge this. There is definitely room for clean up header/inclusion I would create a follow up to clean this up interest of time. |
icexelloss
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Benchmark runs are scheduled for baseline = ddd0a33 and contender = 660d259. 660d259 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
|
['Python', 'R'] benchmarks have high level of regressions. |
…ension (apache#34627) See apache#34626 * Closes: apache#34626 Authored-by: Yaron Gvili <rtpsw@hotmail.com> Signed-off-by: Li Jin <ice.xelloss@gmail.com>
See #34626