-
Notifications
You must be signed in to change notification settings - Fork 2k
Rewrite physical expressions in execution plans #20009
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@askalt, could you review the PR as well? |
|
cc @Omega359 (I remember you were also interested in placeholders within physical plans). |
Yes, I would like to be able to reuse plans - I'll try and find time in the next day or so to review this though I'm not particularly familiar with this area of code. |
7d86e73 to
eb09e71
Compare
eb09e71 to
3046ed0
Compare
3046ed0 to
4420b7f
Compare
e4dd613 to
2186826
Compare
|
Let's resolve threads with a thumb emoji. Look almost good to me, just several details are remained. |
|
@Jefffrey could you please approve CI workflow? |
2186826 to
dcc0262
Compare
askalt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! It looks good to me, will try to find maintainer to review it.
dcc0262 to
966c698
Compare
| }) | ||
| } | ||
|
|
||
| fn with_filter_and_projection( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not mentioned at all in the PR description. Can you please explain why this is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the review! I have updated the PR description. This method allows to override the physical expressions within a file source. For example, we will be able to replace placeholder physical expressions. You can see it here (I removed ResolvePlaceholdersExec from the PR, but it worked by finding the placeholders and then replacing them with literals).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the call chain? Is it an optimizer rule that calls this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataSourceExec::with_physical_expressions -> FileScanConfig::with_physical_expressions -> FileSource::with_filter_and_projection.
This is just one of the implementations of ExecutionPlan::with_physical_expressions, we just need additional methods that rewrite expressions in internal entities.
| fn physical_expressions<'a>( | ||
| &'a self, | ||
| ) -> Option<Box<dyn Iterator<Item = Arc<dyn PhysicalExpr>> + 'a>> { | ||
| None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would make sense to in some future version require every ExecutionPlan to implement this even if it's a no op. Otherwise it's easy to forget. I think enough ExecutionPlans would have expressions that it's worth forcing implementers to make a decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted these changes to be non-breaking. At least, until we implemented a reliable mechanism using these methods (such as ResolvePlaceholdersExec or something else).
| fn physical_expressions<'a>( | ||
| &'a self, | ||
| ) -> Option<Box<dyn Iterator<Item = Arc<dyn PhysicalExpr>> + 'a>> { | ||
| None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same note about removing default impl in the future.
| /// | ||
| /// The default implementation returns `None`, indicating that the node either has no physical | ||
| /// expressions or does not support exposing them. | ||
| fn physical_expressions<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @gabotechs we were discussing adding just this the other day
| /// Returns new file source with given filter and projection. | ||
| fn with_filter_and_projection( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do add this I think it's worth making it very clear how this is intended to be used.
For example, users should not call this from their custom TableProvider implementation to pass in projections / filters. That pushdown is handled by optimizer rules. This is only to be used by ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the documentation of the method.
966c698 to
f92e2cd
Compare
| fn with_physical_expressions( | ||
| &self, | ||
| _params: ReplacePhysicalExpr, | ||
| ) -> Result<Option<Arc<dyn ExecutionPlan>>> { | ||
| Ok(None) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this API is enough to be used in all the current ExecutionPlan implementations.
Take AggregateExec for example:
| pub struct AggregateExec { |
It has:
- a Vec of
AggregateFunctionExprwhere each entry containsargs: Vec<Arc<dyn PhysicalExpr>> - the same Vec of
AggregateFunctionExpralso containsVec<PhysicalSortExpr>(with one expression each) - a
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>> - a
dynamic_filter
If AggregateExecb::physical_expressions returned something like vec![expr1, expr2, expr3, expr4], how would you know to which of all those components above each expression belongs? are the 4 expressions AggregateFunctionExprs? are the first two AggregateFunctionExprs, the third one a filter_expr and the last one a dynamic filter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree we need to be sure this is the right API before moving forward with it
how would you know to which of all those components above each expression belongs?
What would the use cases be that need to differentiate this? Would they be aggregate specific -> could they downcast the plan and call functions on AggregateExec directly?
If something needs to (1) operate on all expressions and (2) have specific logic depending on where that expression is that seems like a hard API to get right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If AggregateExecb::physical_expressions returned something like vec![expr1, expr2, expr3, expr4], how would you know to which of all those components above each expression belongs? are the 4 expressions AggregateFunctionExprs
A layout of the vector returned from ExecutionPlan::physical_expr(...) is plan specific. The API does not provide an ability to modify the layout itself: it could not be done generically, as each plan has different sorts of expressions, so if specific one should be modified -- the right way is to downcast the plan (caller knows which expression exactly is desired to be modified).
On other hand, this API provides an ability to switch some parts of the expressions prior the plan will be executed. For example, we are going to implement placeholders in the plans based on this API, as we can write the code like:
fn resolve_placeholders(expr: &Arc<dyn PhysicalExpr) -> Arc<dyn PhysicalExpr> {
...
}
let resolved_expr = plan.physical_expr().iter().map(|p| resolve_placeholders(p));
let plan = plan.with_physical_expr(resolved_expr);
plan.execute(...)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API does not provide an ability to modify the layout itself:
Yes, we require that the number of expressions returned by ExecutionPlan::physical_expressions match the number of expressions that we specify in ExecutionPlan::with_physical_expressions. If we specified the wrong number of expressions, the call returns an error. This invariant is enforced here (1, 2).
When we have exactly the same number of expressions, we simply replace the corresponding expressions with new ones.
f92e2cd to
f7d9a07
Compare
Introduces `physical_expressions` and `with_physical_expressions` methods to the `ExecutionPlan` trait. These methods provide an interface to inspect and replace physical expressions within execution plan nodes. The commit also implements these methods for various ExecutionPlans and introduces a new physical expression invariant check to ensure consistency between expression retrieval and replacement.
f7d9a07 to
4f4a887
Compare

Rationale for this change
This PR covers a part of #14342.
This PR introduces methods for replacing physical expressions in execution plan nodes. This is an intermediate step in the development of physical-level placeholders. Later, these methods will be used to resolve placeholders in physical plans.
What changes are included in this PR?
physical_expressionsandwith_physical_expressionsto theExecutionPlantrait to allow inspection and replacement of expressions within plan nodes.with_filter_and_projectionto theFileSourcetrait. This method is required to implementwith_physical_expressionsforDataSourceExec. Since file-based scans store their physical expressions (pushed-down filters and projections) within theFileSourceimplementation, we need a way to reconstruct theFileSourcewith updated expressions during plan rewriting.inner_expressions_iterandwith_inner_expressionsto theWindowExprtrait and its implementations to enable expression replacement within window functions.WindowAggExecandBoundedWindowAggExecnow use these methods to support theExecutionPlanrewriting interface.Are these changes tested?
physical_expressionsandwith_physical_expressionsmethods are verified using thecheck_physical_expressionsinvariant.Are there any user-facing changes?
Yes, a user can rewrite physical expressions in some execution plans.