-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Add spilling to RepartitionExec #18014
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @adriangb -- can you please add some "end to end" tests (aka that run a real query / LogicalPlan with a memory limit) and show that it still works?
|
Good call. I'll have to think about how we can force RepartitionExec specifically to spill. I've only experienced it with large datasets, lopsided hash keys, etc |
|
Should we get this PR as draft for now before spilling tests added? |
| (RepartitionBatch::Memory(batch), true) | ||
| } | ||
| Err(_) => { | ||
| // We're memory limited - spill this single batch to its own file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would we avoid out of memory error but risk "to many open files" with this approach, or I'm missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean because of spilling in general or the choice to spill each batch to its own file? In general DataFusion doesn't track number of files. I'm not sure if we keep the files open between writing and reading, I'd guess not. If we close them we only need as many file descriptors as files we spill concurrently - which should not be many (~ number of partitions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, having a file per batch can get process to get killed (by the os) due to too many open files, also there are like 4 system calls per batch involved.
closing and then opening files for will bring at least 6 system calls per batch.
Ideally if we can keep file open and then share offsets after write, would save lot of syscals, plus we could keep a file per partition. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, having a file per batch can get process to get killed (by the os) due to too many open files
I checked and indeed we close the files after writing so this is not going to happen. the only thing we accumulate is PathBufs, not open file descriptors.
closing and then opening files for will bring at least 6 system calls per batch
Ideally if we can keep file open and then share offsets after write, would save lot of syscals
I think that would be nice but I can't think of a scheme that would make that work: reading from the end of the file isn't compatible with our use case because we need it to be FIFO but using a single file only really works if you do LIFO. Do you have any ideas on how we could do this in a relatively simple way?
Given that any query that would spill here would have previously errored out I'm not too worried about performance. And frankly I think if batch sizes are reasonable a couple extra sys calls won't be measurable.
That said since how the spilling happens is all very internal / private there's no reason we can't merge this and then come back and improve it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would file be LIFO if you send offset, I'm not sure I understand. Writing single batch to a file,opening closing still seams excessive to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shuffle exchange in spark or ballista will spill whole repartition to disk if I'm not mistaken I guess spilling in this case would not be different, apart from fact that this is triggered due to memory constraints.
Maybe we could learn something from new real-time mode in spark https://docs.google.com/document/d/1CvJvtlTGP6TwQIT4kW6GFT1JbdziAYOBvt60ybb7Dw8/edit?usp=sharing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is spark jira https://issues.apache.org/jira/browse/SPARK-52330 with link to design document https://docs.google.com/document/d/1CvJvtlTGP6TwQIT4kW6GFT1JbdziAYOBvt60ybb7Dw8/edit?usp=sharing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One alternative could be to do a spill file per-channel and have some sort of gc process where we say "if the spill file exceeds XGB and/or we have more than YGB of junk space we pay the price of copying over into a new spill file to keep disk usage from blowing up". That might be a general approach to handle cases like #18011 as well. But I'm not sure the complexity is worth it. If that happens even once I feel that it will vastly exceed the cost of the extra sys calls to create and delete files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one idea, would it be possible to spill before actual repartition? Try to acquire memory equal to received batch, if it fails do not do repartition, spill it. That would produce just N file (where N is number of partition)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One alternative could be to do a spill file per-channel and have some sort of gc process where we say "if the spill file exceeds XGB and/or we have more than YGB of junk space we pay the price of copying over into a new spill file to keep disk usage from blowing up".
You could avoid a copying gc process maybe via:
- Keep a LIFO list of spill files per channel
- write new batches to the end of the most recent file until its size exceeds some threshold (e.g, 100MB). When the threshold is exceed make a new file
- Read from the oldest file until all batches have been read. Once all batches have been read from a file it can be deleted.
Sure thing. Note that there are tests, just not e2e SLT tests. |
Not sure how to do it from mobile. Will do next time I'm at my computer. |
|
@alamb I cooked up an e2e test, let me know if it looks okay: https://github.com/apache/datafusion/pull/18014/files#diff-e8fa07bcbe7db22bdd43eacfa713d380acaf651770da26cb06957030034ebb93 |
4091979 to
bcfd64a
Compare
|
I have a question: are we assuming that it's not possible to make RepartitionExec memory-constant (i.e., O(n_partitions * batch_size) memory)? Is this due to an engineering limitation, or is it theoretically impossible? |
I think it's theoretically impossible. If one partition is faster than another the data flowing into the slow partition has to accumulate somewhere. I consider the possibility of making the channels bounded, but I'm afraid that would result in deadlocks. |
|
Not really related to this PR specifically, but here are my two cents about some things that could be happening:
Not completely sure, but I'd say it should not be making things much worst. One thing that
Note that currently DataFusion is capable of severely over-accounting memory under certain situations (more info in #16841). So even if this PR has value itself, It'd be worth to double check you are not hitting one of those situations. |
Yeah I'm not sure either, I was just mentioning that in passing, but since there are other examples (#17334 (comment)) of this happening in plain DataFusion it made sense to me to implement this, especially since the only cases that will hit the spilling are cases that would have failed previously. |
|
🤖 |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb for this contribution
I agree this is better than what is on main as some queries that would have straightup failed will now have a chance of running.
As long as the benchmark runs don't show any performance regressions I think we could proceed with merging this PR in.
The only thing I think needs to be done is to restore the oom test (though I do think we could do it as a follow on PR)
That being said, I also agree with @milenkovicm, @gabotechs and @Omega359 that using a single file for each spilled RecordBatch is likely going to be wildly inefficient (and really slow).
What I suggest is:
- Merge this PR
- File a follow on ticket describing a more sophisticated algorithm for reading/writing spill files to reduce the syscall overhead, (e.g. writing multiple batches to the same file when possible, etc) leaving a ticket reference in the comments
- Maybe add a logging message or some other hint when the spilling happens to help debugging
Then if/when we see queries fail / have bad performance due to this per-batch spilling, we can base the follow on performance enhancement work on real-world experiences / actual usecases.
cc @crepererum as you are familar with this work
| .create_logical_plan("SELECT c1, count(*) as c FROM t GROUP BY c1;") | ||
| .await | ||
| .unwrap(); | ||
| let plan = ctx.state().create_physical_plan(&plan).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend also verifying here that there are exactly two partitions in the final plan
| let plan = ctx.state().create_physical_plan(&plan).await.unwrap(); | |
| let plan = ctx.state().create_physical_plan(&plan).await.unwrap(); | |
| assert_eq!(plan.output_partitioning().partition_count(), 2); |
| /// Just as some examples of real world scenarios where this can happen consider | ||
| /// lopsided groups in a group by especially if one partitions spills and others don't, | ||
| /// or in distributed systems if one upstream node is slower than others. | ||
| #[tokio::test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a neat test. However, for some reason it takes over 5 seconds to run on laptop.
andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ cargo test --test core_integration -- repartition_memory
Finished `test` profile [unoptimized + debuginfo] target(s) in 0.21s
Running tests/core_integration.rs (target/debug/deps/core_integration-97767bbb2ea8803a)
running 1 test
test memory_limit::repartition_mem_limit::test_repartition_memory_limit ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 704 filtered out; finished in 4.99s. <--- 5 seconds!!!
I suspect it is because it is creating something like 1M / 32 = 31,250 files
Is there any way to we can reduce the runtime (like maybe only use 1000 rows, or 10,000 rows?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, takes 0.62s on my laptop now
| SharedMemoryReservation, | ||
| ), | ||
| >, | ||
| channels: HashMap<usize, PartitionChannels>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is much nicer
| (RepartitionBatch::Memory(batch), true) | ||
| } | ||
| Err(_) => { | ||
| // We're memory limited - spill this single batch to its own file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One alternative could be to do a spill file per-channel and have some sort of gc process where we say "if the spill file exceeds XGB and/or we have more than YGB of junk space we pay the price of copying over into a new spill file to keep disk usage from blowing up".
You could avoid a copying gc process maybe via:
- Keep a LIFO list of spill files per channel
- write new batches to the end of the most recent file until its size exceeds some threshold (e.g, 100MB). When the threshold is exceed make a new file
- Read from the oldest file until all batches have been read. Once all batches have been read from a file it can be deleted.
| match &mut self.state { | ||
| RepartitionStreamState::ReceivingFromChannel => { | ||
| match self.input.recv().poll_unpin(cx) { | ||
| Poll::Ready(Some(Some(v))) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could reduce the level of indenting here using the ready! macro which would make it more readable
| match &mut self.state { | ||
| RepartitionStreamState::ReceivingFromChannel => { | ||
| match self.receiver.recv().poll_unpin(cx) { | ||
| Poll::Ready(Some(Some(v))) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment here about using ready! macro to simpify this loop
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn oom() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is important to leave the oom test here to verify that the system will still get an OOM error when no disk manager is enabled
|
🤖: Benchmark completed Details
|
Co-authored-by: Bruce Ritchie <bruce.ritchie@veeva.com>
bcfd64a to
8f81a36
Compare
Suspicious 🤔 |
I have a followup ready using the approach suggested in #18014 (comment): pydantic#40 I'll go ahead and merge this PR to avoid needing re-review and we can continue in that one. |
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
Addresses apache#17334 (comment) I ran into this using `datafusion-distributed` which I think makes the issue of partition execution time skew even more likely to happen. As per that issue it can also happen with non-distributed queries, e.g. if one partition's sort spills and others don't. Due to the nature of `ReparitionExec` I don't think we can bound the channels, that could lead to deadlocks. So what I did was at least make queries that would have previously fail continue forward with disk spilling. I did not account for memory usage when reading batches back from disk since DataFusion in general does not generally account for "in-flight" batches. Written with help from Claude --------- Co-authored-by: Bruce Ritchie <bruce.ritchie@veeva.com>
Addresses apache#17334 (comment) I ran into this using `datafusion-distributed` which I think makes the issue of partition execution time skew even more likely to happen. As per that issue it can also happen with non-distributed queries, e.g. if one partition's sort spills and others don't. Due to the nature of `ReparitionExec` I don't think we can bound the channels, that could lead to deadlocks. So what I did was at least make queries that would have previously fail continue forward with disk spilling. I did not account for memory usage when reading batches back from disk since DataFusion in general does not generally account for "in-flight" batches. Written with help from Claude --------- Co-authored-by: Bruce Ritchie <bruce.ritchie@veeva.com>
Addresses #18014 (comment), potentially paves the path to solve #18011 for other operators as well --------- Co-authored-by: Yongting You <2010youy01@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Addresses apache#18014 (comment), potentially paves the path to solve apache#18011 for other operators as well --------- Co-authored-by: Yongting You <2010youy01@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Addresses apache#17334 (comment) I ran into this using `datafusion-distributed` which I think makes the issue of partition execution time skew even more likely to happen. As per that issue it can also happen with non-distributed queries, e.g. if one partition's sort spills and others don't. Due to the nature of `ReparitionExec` I don't think we can bound the channels, that could lead to deadlocks. So what I did was at least make queries that would have previously fail continue forward with disk spilling. I did not account for memory usage when reading batches back from disk since DataFusion in general does not generally account for "in-flight" batches. Written with help from Claude --------- Co-authored-by: Bruce Ritchie <bruce.ritchie@veeva.com>
Addresses #17334 (comment)
I ran into this using
datafusion-distributedwhich I think makes the issue of partition execution time skew even more likely to happen. As per that issue it can also happen with non-distributed queries, e.g. if one partition's sort spills and others don't.Due to the nature of
ReparitionExecI don't think we can bound the channels, that could lead to deadlocks. So what I did was at least make queries that would have previously fail continue forward with disk spilling. I did not account for memory usage when reading batches back from disk since DataFusion in general does not generally account for "in-flight" batches.Written with help from Claude