feat: support aggregate in scanner#5911
Conversation
Code ReviewSummaryThis PR adds Substrait aggregate pushdown support to Lance, enabling query engines like Trino to push aggregates (COUNT, SUM, AVG, MIN, MAX with GROUP BY) to the storage layer. P0/P1 IssuesP1: Test assertion is a no-op (line 311 in substrait.rs) assert\!(result.is_ok() || result.is_err());This assertion always passes. Either remove the test or make it meaningful by testing specific expected behavior (e.g., assert the specific error type when extensions are missing). P1: Memory allocation on hot path (scanner.rs:1382) self.substrait_aggregate = Some(aggregate_rel.to_vec());The P1: Incomplete implementation marker (substrait.rs:231) let order_by = Vec::new(); // TODO: parse agg_func.sorts if neededORDER BY in aggregates (e.g., Minor Observations (not blocking)
Overall: Good test coverage with 14 tests covering various aggregate scenarios. The implementation correctly leverages DataFusion's physical planning and Substrait parsing infrastructure. |
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
afc2170 to
cf4c194
Compare
Add support for aggregates via Substrait AggregateRel specification. Key changes: - Add `AggregateSpec` enum with Substrait and Datafusion variants - Add `aggregate_substrait()` and `aggregate_expr()` methods to Scanner - Add `create_aggregate_plan()` to build execution plan with AggregateExec - Add Substrait parsing utilities in lance-datafusion for AggregateRel - Implement type coercion for UserDefined signature functions (e.g., AVG) - Support output column aliases via RelRoot.names Supported: COUNT, SUM, AVG, MIN, MAX with GROUP BY. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
cf4c194 to
e7282e6
Compare
…e API - Rename AggregateSpec to AggregateExpr for consistency - Add helper constructors: substrait() and datafusion() - Combine aggregate_substrait() and aggregate_expr() into single aggregate() method - Update tests to use new API 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Create lance-datafusion/src/aggregate.rs for Aggregate struct - Remove #[cfg(feature = "substrait")] from create_aggregate_plan - create_aggregate_plan now works without substrait feature 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Parse agg_func.sorts for ordered aggregates like ARRAY_AGG. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add tests for FIRST_VALUE with ORDER BY ASC and DESC to verify the sorts parsing works correctly. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
I think this is a good addition. The scanner creates very limited linear SQL plans (SCAN -> FILTER -> SORT? -> LIMIT? PROJECT) and so I think this still fits (SCAN -> FILTER -> SORT? -> LIMIT? -> PROJECT -> AGGREGATE?).
My only concern is making sure we have a good non-Substrait path. But we can add that in a follow-up too.
| pub group_by: Vec<Expr>, | ||
| pub aggregates: Vec<Expr>, |
There was a problem hiding this comment.
Let's go ahead and comment these two.
| /// Create an execution plan with aggregation. | ||
| /// | ||
| /// Requires `aggregate()` to be called first. | ||
| pub fn create_aggregate_plan(&self) -> BoxFuture<'_, Result<Arc<dyn ExecutionPlan>>> { |
There was a problem hiding this comment.
Why do we need a separate create_aggregate_plan? Can we just append the aggregate expr onto the end of create_plan when aggregate is set? It would be surprising to me as a user if I called aggregate and then create_plan just ignored it.
There was a problem hiding this comment.
Is there a use-case (like my plan_splits proposal?) that would benefit from separating this logic? Maybe create_aggregate_plan is called in plan_spits and create_plan? If the goal is to create two separate entrypoints, one for executing an expr and one for breaking it into multiple partitions maybe there is utility. I agree that setting an aggregate and then having it not apply when calling create_plan would surprise me as a user.
| Datafusion { | ||
| group_by: Vec<Expr>, | ||
| aggregates: Vec<Expr>, | ||
| output_names: Vec<String>, | ||
| }, |
There was a problem hiding this comment.
We can do this in a follow-up but there should be a way for callers to specify an AggregateExpr without needing to use DF or Substrait. Ideally with some kind of builder.
| Datafusion { | ||
| group_by: Vec<Expr>, | ||
| aggregates: Vec<Expr>, | ||
| output_names: Vec<String>, |
There was a problem hiding this comment.
Why is this needed? Datafusion Expr already has a name concept. In other words, if a user wants the MAX(temp) aggregate to be named max_temp they can do .alias("max_temp") on the Expr right?
| Ok(agg_expr) | ||
| } | ||
|
|
||
| /// Apply type coercion to aggregate arguments for UserDefined signature functions. |
There was a problem hiding this comment.
Why is this needed? Why don't we do type coercion on non-user defined functions?
| coerced_expr | ||
| }; | ||
|
|
||
| let (agg_expr, _filter, _order_by) = create_aggregate_expr_and_maybe_filter( |
There was a problem hiding this comment.
What does it mean if _filter or _order_by are set? Do we need to utilize these?
| agg_func.params.null_treatment, | ||
| ))) | ||
| } | ||
| other => Ok(other.clone()), |
There was a problem hiding this comment.
Should it be an error if we hit this branch?
| /// Output column names in order: group_by columns first, then aggregates. | ||
| pub output_names: Vec<String>, |
There was a problem hiding this comment.
I mention this in scanner.rs too but having output_names be a separate property is a Substrait thing. I think we could instead alias the expressions when we are parsing the Substrait. This keeps the logic in Scanner simpler.
|
|
||
| /// Set aggregation. | ||
| pub fn aggregate(&mut self, aggregate: AggregateExpr) -> &mut Self { | ||
| self.aggregate = Some(aggregate); |
There was a problem hiding this comment.
We should probably emit a warning or error if a user calls both aggregate and limit.
| ) | ||
| })?; | ||
|
|
||
| let plan = self.create_plan().await?; |
There was a problem hiding this comment.
If a user calls order_by how should we interpret it? Ordering the data before the aggregate (what it is doing today) is just meaningless work. We could interpret it as a request to order the data after the aggregate? Or we could just return an error and say you can't do both? Or we could just log a warning and ignore the sort?
hamersaw
left a comment
There was a problem hiding this comment.
Is the premise that some aggregates can be performed more efficiently by the scanner than by the distributed execution framework? I think in scenarios where we have to read all of the data it should be pretty similar performance-wise if this is done by a datafusion step or by the framework. There are specific cases, like MIN / MAX where we could serve from Zonemaps if there is no filter; is this the kind of query we're trying to optimize for?
| /// Create an execution plan with aggregation. | ||
| /// | ||
| /// Requires `aggregate()` to be called first. | ||
| pub fn create_aggregate_plan(&self) -> BoxFuture<'_, Result<Arc<dyn ExecutionPlan>>> { |
There was a problem hiding this comment.
Is there a use-case (like my plan_splits proposal?) that would benefit from separating this logic? Maybe create_aggregate_plan is called in plan_spits and create_plan? If the goal is to create two separate entrypoints, one for executing an expr and one for breaking it into multiple partitions maybe there is utility. I agree that setting an aggregate and then having it not apply when calling create_plan would surprise me as a user.
westonpace
left a comment
There was a problem hiding this comment.
Some minor nits but good to go otherwise. Thanks for tackling this!
| /// Add a column to group by. | ||
| pub fn group_by(mut self, column: impl Into<String>) -> Self { | ||
| self.group_by.push(col(column.into())); | ||
| self | ||
| } | ||
|
|
||
| /// Add multiple columns to group by. | ||
| pub fn group_by_columns( | ||
| mut self, | ||
| columns: impl IntoIterator<Item = impl Into<String>>, | ||
| ) -> Self { | ||
| for column in columns { | ||
| self.group_by.push(col(column.into())); | ||
| } | ||
| self | ||
| } |
There was a problem hiding this comment.
Minor nit: can we comment that multiple invocations of group_by or group_by_columns will add to the list (and not replace it). E.g. .group_by("x").group_by_columns(["y", "z"]) will group by x, y, and z.
| /// Add COUNT(column) aggregate. | ||
| pub fn count(self, column: impl Into<String>) -> AggregateExprBuilderWithPendingAggregate { |
There was a problem hiding this comment.
Minor nit: the difference between count_star and count is always subtle for newcomers to SQL. Can we expand this comment?
/// Add COUNT(column) aggregate.
///
/// Unlike count_star this will only return the number of rows where `column`
/// is not NULL
| /// Builder state with a pending aggregate that can be aliased. | ||
| #[derive(Debug, Clone)] | ||
| pub struct AggregateExprBuilderWithPendingAggregate { | ||
| builder: AggregateExprBuilder, | ||
| pending: Expr, | ||
| } |
There was a problem hiding this comment.
Hmm, I don't love having two structs (and duplicating all the methods) but it isn't the end of the world. One solution I've seen to this problem has been to use const generics (not sure the below compiles but should communicate the idea)...
pub struct AggregateExprBuilder<const HAS_PENDING_AGG: bool> {
group_by: Vec<Expr>,
aggregates: Vec<Expr>,
}
impl<const HAS_PENDING_AGG: bool> AggregateExprBuilder<HAS_PENDING_AGG> {
pub fn new() -> AggregateExprBuilder<false> {
AggregateExprBuilder<false> {
group_by: Vec::default(),
aggregates: Vec::default(),
}
}
...
/// Add SUM(column) aggregate.
pub fn sum(mut self, column: impl Into<String>) -> AggregateExprBuilder<true> {
self.aggregates.push(functions_aggregate::sum::sum(col(column.into())));
AggregateExprBuilder<true> {
group_by: self.group_by,
aggregates: self.aggregates
}
}
}
impl AggregateExprBuilder<true> {
/// Set an alias for the pending aggregate.
pub fn alias(mut self, name: impl Into<String>) -> AggregateExprBuilder<true> {
let aliased = self.aggregates.pop().alias(name.into());
self.aggregates.push(aliased);
self
}
}
| if self.aggregate.is_some() { | ||
| if self.limit.is_some() || self.offset.is_some() { | ||
| return Err(Error::InvalidInput { | ||
| source: "Cannot use limit/offset with aggregate. Apply limit after aggregation instead.".into(), |
There was a problem hiding this comment.
| source: "Cannot use limit/offset with aggregate. Apply limit after aggregation instead.".into(), | |
| source: "Cannot use limit/offset with aggregate. Apply limit to the result instead.".into(), |
| } | ||
| if self.ordering.is_some() { | ||
| return Err(Error::InvalidInput { | ||
| source: "Cannot use order_by with aggregate. Apply ordering after aggregation instead.".into(), |
There was a problem hiding this comment.
| source: "Cannot use order_by with aggregate. Apply ordering after aggregation instead.".into(), | |
| source: "Cannot use order_by with aggregate. Apply ordering to the result instead.".into(), |
| // Stage 2.5: aggregate (if set, applies aggregate and returns early) | ||
| if let Some(agg_spec) = &self.aggregate { | ||
| // Take columns needed for aggregation | ||
| plan = self.take(plan, self.projection_plan.physical_projection.clone())?; |
There was a problem hiding this comment.
I think this a no-op? We should always have the physical projection loaded already? I don't think aggregates are allowed to reference additional columns so we shouldn't need a take here?
There was a problem hiding this comment.
This is actually not a no-op if we do an aggregate on top of vector search of full text search, the source returns search results with scores rather than the full projection columns. The take() function handles both cases pretty gracefully since TakeExec::try_new returns None when no new columns are needed, causing it to simply return the original plan unchanged. So while it's often a no-op for simple scans, we keep it there for correctness in other source types, and the cost is just a minimal check.
There was a problem hiding this comment.
I added a few new tests to cover those cases
There was a problem hiding this comment.
Ah, ok. Normally we'd do that take at stage 5. So we are pulling it up here instead which is fine since we prevent an aggregate with a limit (otherwise we'd be taking too much if we did that here)
All good!
| // Stage 2: filter | ||
| plan = filter_plan.refine_filter(plan, self).await?; | ||
|
|
||
| // Stage 2.5: aggregate (if set, applies aggregate and returns early) |
There was a problem hiding this comment.
We should really just get rid of the whole "stage" concept in the comments 😆
| plan = filter_plan.refine_filter(plan, self).await?; | ||
|
|
||
| // Stage 2.5: aggregate (if set, applies aggregate and returns early) | ||
| if let Some(agg_spec) = &self.aggregate { |
There was a problem hiding this comment.
I'm slightly torn on whether it makes sense to apply the aggregate before or after the projection. I fear we will have complaints either way.
If we apply the aggregate here (before projection) we cannot reference system columns or projected columns in the aggregate (e.g. can't do MAX(_rowoffset) or MAX(x * 2)). If we apply the aggregate later (stage 7.5) then we can't reference the aggregate in the projection (e.g. can't do 2 * MAX(x)).
I suppose it only really makes sense to be utilizing aggregates that can be pushed down and we wouldn't push down something like MAX(x * 2) anyways (I mean...we could...but I have no desire to do so).
Ok...I think I've convinced myself I like this how it is!
|
Thanks for the reviews! |
The main use case of this PR is to allow engines like Spark and Trino to pushdown an aggregate into Lance scanner in distributed worker when possible. Today we technically already supports
COUNT(*)pushdown throughscanner.count_rows()to count rows of each fragment distributedly, this is a more generic version of that. My plan is to allow an engine to pass a Substrait Aggregate expression to scanner in the worker to support pushdown other aggregations likeSUM,MAX,MIN.Another alternative I have thought about is to just update the
dataset.sql()API to accept a full Substrait plan so we can execute a plan with aggregate, and update distributed worker to run a SQL statement instead of running the scanner. But doing this feature in scanner feels more aligned with how engines implement the distributed execution. Basically whatever that could be executed by a single worker in a distributed environment (predicate pushdown, column projection, aggregate pushdown) should be supported by the scanner.Note that with this change, we can technically remove
create_count_plansince it's just a subcase ofcreate_aggregate_plan, but we are not doing it in this PR. Once we agree upon this direction, I will do a separated PR to refactor that.