-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Pipeline-friendly Bounded Memory Window Executor #4777
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
56db313
343fafb
dfb6683
0a42315
c2a1593
bf7bd11
4cb7258
dbc30ab
aa4f739
6309b01
d0d06de
91629b8
ae451a4
94c784b
7c4bcb9
0068566
0e73945
4f145dd
c63057f
838972c
6d9a876
f2c7286
6b07621
d62bbdc
a2d2229
63d77a6
28075e6
ba388cb
572a1a4
fa30d91
af60aa9
ca711e4
eb97a5c
36394c0
8b3d37f
25af93c
2b2b376
670fe32
3ea9eed
3892394
ca666e9
73d99c6
09c1942
39564d4
9f73ba7
8ac3847
3349edf
701c43e
3b523b4
28773ab
15d416a
9ceb137
8b9aa6f
e13d6e0
93b8d80
d97a1ad
29007ea
a5019c3
ac2f248
0ca3889
516e512
1e764dd
28d68bb
c4b61c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,7 +47,7 @@ use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; | |
| use crate::physical_plan::projection::ProjectionExec; | ||
| use crate::physical_plan::repartition::RepartitionExec; | ||
| use crate::physical_plan::sorts::sort::SortExec; | ||
| use crate::physical_plan::windows::WindowAggExec; | ||
| use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; | ||
| use crate::physical_plan::{joins::utils as join_utils, Partitioning}; | ||
| use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr}; | ||
| use crate::{ | ||
|
|
@@ -614,13 +614,28 @@ impl DefaultPhysicalPlanner { | |
| }) | ||
| .collect::<Result<Vec<_>>>()?; | ||
|
|
||
| Ok(Arc::new(WindowAggExec::try_new( | ||
| window_expr, | ||
| input_exec, | ||
| physical_input_schema, | ||
| physical_partition_keys, | ||
| physical_sort_keys, | ||
| )?)) | ||
| let uses_bounded_memory = window_expr | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if it would be cleaner to leave the conversion to That would both keep the physical plan simpler as well as ensure all the cases you care about are covered in datafusion/core/src/physical_optimizer/optimize_sorts.rs
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The challenge with doing this in However, I agree with your general line of thinking. We are currently experimenting with various ways to simplify/reorganize rules so that this can happen not in the planner but in a rule (like We will submit a follow-on PR when we have something mature. |
||
| .iter() | ||
| .all(|e| e.uses_bounded_memory()); | ||
| // If all window expressions can run with bounded memory, | ||
| // choose the bounded window variant: | ||
| Ok(if uses_bounded_memory { | ||
| Arc::new(BoundedWindowAggExec::try_new( | ||
| window_expr, | ||
| input_exec, | ||
| physical_input_schema, | ||
| physical_partition_keys, | ||
| physical_sort_keys, | ||
| )?) | ||
| } else { | ||
| Arc::new(WindowAggExec::try_new( | ||
| window_expr, | ||
| input_exec, | ||
| physical_input_schema, | ||
| physical_partition_keys, | ||
| physical_sort_keys, | ||
| )?) | ||
| }) | ||
| } | ||
| LogicalPlan::Aggregate(Aggregate { | ||
| input, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.