-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Support avg(distinct) for float64 type
#17255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… and remove unused sum_distinct module
This reverts commit 1fa2db8.
| Ok(vec![ | ||
| Field::new( | ||
| format_state_name(args.name, "count"), | ||
| DataType::UInt64, | ||
| true, | ||
| ), | ||
| Field::new( | ||
| format_state_name(args.name, "sum"), | ||
| args.input_fields[0].data_type().clone(), | ||
| true, | ||
| ), | ||
| ] | ||
| .into_iter() | ||
| .map(Arc::new) | ||
| .collect()) | ||
| if args.is_distinct { | ||
| // Copied from datafusion_functions_aggregate::sum::Sum::state_fields | ||
| // since the accumulator uses DistinctSumAccumulator internally. | ||
| Ok(vec![Field::new_list( | ||
| format_state_name(args.name, "sum distinct"), | ||
| Field::new_list_field(args.return_type().clone(), true), | ||
| false, | ||
| ) | ||
| .into()]) | ||
| } else { | ||
| Ok(vec![ | ||
| Field::new( | ||
| format_state_name(args.name, "count"), | ||
| DataType::UInt64, | ||
| true, | ||
| ), | ||
| Field::new( | ||
| format_state_name(args.name, "sum"), | ||
| args.input_fields[0].data_type().clone(), | ||
| true, | ||
| ), | ||
| ] | ||
| .into_iter() | ||
| .map(Arc::new) | ||
| .collect()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is my main contribution in addition to the changes from the original PR, to fix that error with the differing field counts. I wonder if there's a better way to architect this, since wasn't clearly obvious that this is related to the state of the accumulator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand this question -- the PR's code looks good to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, what I meant was that in accumulators we can return the state() like so:
datafusion/datafusion/functions-aggregate/src/sum.rs
Lines 432 to 450 in be3842b
| fn state(&mut self) -> Result<Vec<ScalarValue>> { | |
| // 1. Stores aggregate state in `ScalarValue::List` | |
| // 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set | |
| let state_out = { | |
| let distinct_values = self | |
| .values | |
| .iter() | |
| .map(|value| { | |
| ScalarValue::new_primitive::<T>(Some(value.0), &self.data_type) | |
| }) | |
| .collect::<Result<Vec<_>>>()?; | |
| vec![ScalarValue::List(ScalarValue::new_list_nullable( | |
| &distinct_values, | |
| &self.data_type, | |
| ))] | |
| }; | |
| Ok(state_out) | |
| } |
However this must align with state_fields() of the parent aggregate UDF:
datafusion/datafusion/functions-aggregate/src/sum.rs
Lines 204 to 221 in be3842b
| fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> { | |
| if args.is_distinct { | |
| Ok(vec![Field::new_list( | |
| format_state_name(args.name, "sum distinct"), | |
| // See COMMENTS.md to understand why nullable is set to true | |
| Field::new_list_field(args.return_type().clone(), true), | |
| false, | |
| ) | |
| .into()]) | |
| } else { | |
| Ok(vec![Field::new( | |
| format_state_name(args.name, "sum"), | |
| args.return_type().clone(), | |
| true, | |
| ) | |
| .into()]) | |
| } | |
| } |
But this isn't clearly obvious at compile time, and during runtime we only hit this issue for certain test cases (for this distinct avg PR). So I was wondering if there was a better way to enforce this at compile time. Hope that clears it up.
|
Looks like still getting same error for some of the extended tests: Will look into this Edit: seems to be related to group accumulator support |
| args.return_field.data_type(), | ||
| DataType::Float64 | DataType::Decimal128(_, _) | DataType::Duration(_) | ||
| ) | ||
| ) && !args.is_distinct |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to how sum handles it:
datafusion/datafusion/functions-aggregate/src/sum.rs
Lines 223 to 225 in 8c58f53
| fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool { | |
| !args.is_distinct | |
| } |
Fixed by bc121fb |
|
This will I assume require regenerating the extended slt files in datafusion-testing? |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Jefffrey -- I think the Pr looks quite good. I think it should just have a few more tests, but otherwise 👌
| (1, 1), | ||
| (2, 2), | ||
| (3, 3), | ||
| (4, 4), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you update this test so:
- The input isn't in order
- Add a test for floating point values
- Test for an input that includes at least one null value
- the values in
bare different than the values inb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will work on adding these cases 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed those points in this commit: 3abb4b7
| Ok(vec![ | ||
| Field::new( | ||
| format_state_name(args.name, "count"), | ||
| DataType::UInt64, | ||
| true, | ||
| ), | ||
| Field::new( | ||
| format_state_name(args.name, "sum"), | ||
| args.input_fields[0].data_type().clone(), | ||
| true, | ||
| ), | ||
| ] | ||
| .into_iter() | ||
| .map(Arc::new) | ||
| .collect()) | ||
| if args.is_distinct { | ||
| // Copied from datafusion_functions_aggregate::sum::Sum::state_fields | ||
| // since the accumulator uses DistinctSumAccumulator internally. | ||
| Ok(vec![Field::new_list( | ||
| format_state_name(args.name, "sum distinct"), | ||
| Field::new_list_field(args.return_type().clone(), true), | ||
| false, | ||
| ) | ||
| .into()]) | ||
| } else { | ||
| Ok(vec![ | ||
| Field::new( | ||
| format_state_name(args.name, "count"), | ||
| DataType::UInt64, | ||
| true, | ||
| ), | ||
| Field::new( | ||
| format_state_name(args.name, "sum"), | ||
| args.input_fields[0].data_type().clone(), | ||
| true, | ||
| ), | ||
| ] | ||
| .into_iter() | ||
| .map(Arc::new) | ||
| .collect()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand this question -- the PR's code looks good to me
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Yep, refer to apache/datafusion-testing#11 |
|
I think all we need now is to merge apache/datafusion-testing#11 and then update the I am testing locally with the changes from apache/datafusion-testing#11 using: INCLUDE_SQLITE=true cargo test --profile release-nonlto --test sqllogictests |
| SELECT array_agg(a_varchar order by a_varchar) WITHIN GROUP (ORDER BY a_varchar) | ||
| FROM (VALUES ('a'), ('d'), ('c'), ('a')) t(a_varchar); | ||
|
|
||
| # distinct average |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
👍 |
|
I updated the pin and merged up from main |
Cheers 👍 |
|
🚀 |
Which issue does this PR close?
Relates to #2408
Rationale for this change
Building on old PR #15413, to get it over the line. Opened as new PR as other PR was too old and wasn't sure if should push to the original author branch; for now pushed to my own branch but preserved the original commits.
What changes are included in this PR?
From original PR:
DistinctSumAccumulatorto common so that it can be used inFloat64DistinctAvgAccumulatorFloat64DistinctAvgAccumulatorusingDistinctSumAccumulatoraggregate.sltAdditional changes made by me:
query error DataFusion error: Arrow error: Invalid argument error: number of columns\(1\) must match number of fields\(2\) in schema(Support Avg distinct forfloat64type #15413 (comment)) which was caused by wrong state fields (and also but not disabling group accumulator support if distinct)Are these changes tested?
Added SLT tests, and also regenerated the extended tests: apache/datafusion-testing#11
Are there any user-facing changes?
Not sure if doc changes are required, as this is mainly for SQL and it seems we don't explicitly say
avg(distinct)is disallowed so don't need to update anything saying it works now?