feat: retry merge_insert when possible#3614
Conversation
20581e8 to
aa9fe97
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3614 +/- ##
==========================================
+ Coverage 78.50% 78.57% +0.07%
==========================================
Files 268 272 +4
Lines 100735 101868 +1133
Branches 100735 101868 +1133
==========================================
+ Hits 79078 80041 +963
- Misses 18538 18670 +132
- Partials 3119 3157 +38
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
merge_insert when possible
| }, | ||
| } | ||
|
|
||
| impl std::fmt::Display for Operation { |
There was a problem hiding this comment.
nit: we could get this for free from strum_macros but would need to add an explicit dependency
| )) as SendableRecordBatchStream | ||
| }))) | ||
| } else { | ||
| // TODO: allow buffering up to 100MB in memory before spilling to disk. |
There was a problem hiding this comment.
Does this not do any in-memory buffering at all right now? 100MB seems small for some server-side use-cases, and we should make it configurable.
There was a problem hiding this comment.
It doesn't do any in-memory buffering, no.
There was a problem hiding this comment.
100MB seems small for some server-side use-cases
Why do you say that? What's the downside of spilling to disk for over 100MB of data? The operations will still work, and should be plenty fast.
There was a problem hiding this comment.
I mean, we may want to buffer more than 100MB in memory, and we will want some way to configure it, possibly as a global pool across multiple datasets
There was a problem hiding this comment.
we may want to buffer more than 100MB in memory
I was wondering more if you think it's dangerous to ship as-is. The only times I can think where you might want to change this seem to be edge cases:
- You are writing 1GB of data, but the latency hit from writing to disk is meaningful. If the final destination is object storage, and your cache is an SSD or even just HDD, I don't think this is true. Could be true is final destination is also SSD.
- You are writing 10GB of data, and you have 10GB of memory available but not 10GB of disk available. I think 99% of times a computer will have more disk than memory available.
Are there others you have in mind?
| .ok() | ||
| .expect_ok()??; | ||
|
|
||
| let tmp_path = tmp_dir.path().join("spill.arrows"); |
There was a problem hiding this comment.
Where/when does this get cleaned up? We should discuss how we will respond to disk full errors
There was a problem hiding this comment.
When the SpillStreamIter is dropped, the inner tempfile::TempDir is dropped, at which point the temporary directory will be deleted.
rpgreen
left a comment
There was a problem hiding this comment.
Nice! I think we will want to avoid spilling to disk up to some threshold that we can configure.
wip wip: utilities wip: stream finish spill handle errors through spill test background iter preserves size_hint test more clippy fix lifetime issues add missing file finish commit stuff fix transaction conflict handling lint
bea7daa to
112ae18
Compare
westonpace
left a comment
There was a problem hiding this comment.
A bunch of nits but overall this looks pretty good. Some nice helper utilities in here as well. Thanks!
| /// * This counts the **total** size of the buffers, even if the array is a slice. | ||
| /// Round-tripped data may use less memory because of this. | ||
| #[derive(Default)] | ||
| pub struct MemoryAccumulator { |
There was a problem hiding this comment.
What's the motivation to use this instead of get_array_memory_size? Is it because you are worried about shared buffers?
There was a problem hiding this comment.
Yeah, shared / sliced buffers. Worried we or the user might have something that sliced the input data into smaller batches. Naively using get_array_memory_size will double count in those cases, as SaintBacchus found while attempting this PR: #3435 (comment)
| /// | ||
| /// If the spill has been dropped, an error will be returned. |
There was a problem hiding this comment.
Dropped at all? Or dropped without finish being called? Wouldn't the writer normally call finish and then drop the write end?
There was a problem hiding this comment.
Dropped at all. The state (whether or not it's spilled / finish writing) is held in a channel, that becomes inaccessible when dropped. We could probably change this in a future version if we want, but for now you have to keep the sender alive even after calling finish().
There was a problem hiding this comment.
That doesn't seem right. Though I just noticed you are using tokio::sync::watch::channel? Why watch? Won't that potentially drop data? Why not mpsc channel?
There was a problem hiding this comment.
Oh...I see...this is just for the status. Ok, I was confusing myself and thought you were sending batches over the channel. Now it makes sense.
| /// Start a spill of Arrow data to a temporary file. The file is an Arrow IPC | ||
| /// stream file. | ||
| /// | ||
| /// Up to `memory_limit` bytes of data can be buffered in memory before a spill | ||
| /// is created. If the memory limit is never reached before [`SpillSender::finish()`] | ||
| /// is called, then the data will simply be kept in memory and no spill will be | ||
| /// created. | ||
| /// | ||
| /// The [`SpillSender`] allows you to write batches to the spill. | ||
| /// | ||
| /// The [`SpillReceiver`] can open a [`SendableRecordBatchStream`] that reads | ||
| /// batches from the spill. This can be opened before, during, or after batches | ||
| /// have been written to the spill. | ||
| /// | ||
| /// Once [`SpillSender`] is dropped, the temporary file is deleted. This will | ||
| /// cause the [`SpillReceiver`] to return an error if it is still open. |
There was a problem hiding this comment.
Mention in here somewhere that path is the path the data will be written to?
| self.state = SpillState::Spilling { | ||
| writer, | ||
| batches_written, | ||
| }; | ||
| if let SpillState::Spilling { | ||
| writer, | ||
| batches_written, | ||
| } = &mut self.state | ||
| { | ||
| (writer, batches_written) | ||
| } else { | ||
| unreachable!() | ||
| } |
There was a problem hiding this comment.
This is weird to set the enum and then immediately turn around and if let it but I can't think of a better way 😆
| // TODO(rmeng): check that the new indices isn't on the column being replaced | ||
| true | ||
| NotCompatible | ||
| } | ||
| Operation::Rewrite { .. } => { | ||
| // TODO(rmeng): check that the fragments being replaced are not part of the groups | ||
| true | ||
| NotCompatible | ||
| } | ||
| Operation::DataReplacement { .. } => { | ||
| // TODO(rmeng): check cell conflicts | ||
| true | ||
| NotCompatible | ||
| } | ||
| _ => true, | ||
| _ => NotCompatible, |
There was a problem hiding this comment.
These are all NotCompatible because this is still kind of half-finished right? It seems DataReplacement could be retried in many cases?
There was a problem hiding this comment.
Yeah I figured I'll rebase #3631 on this and finish it there.
| // this struct. When this struct is dropped, the Drop implementation of | ||
| // tempfile::TempDir will delete the temp dir. | ||
| #[allow(dead_code)] // Exists to keep the temp dir alive | ||
| tmp_dir: tempfile::TempDir, |
There was a problem hiding this comment.
We may want this to be configurable at some point in the future.
There was a problem hiding this comment.
Yeah I figure once we have a DataFusion SessionContext in the Session, I can use the DataFusion DiskManager here.
westonpace
left a comment
There was a problem hiding this comment.
Not a big deal for this PR but I think the utilities in spill are not what I would normally expect as "spill". I think of "spill" as something that is written to and then read from. We wouldn't write batches we've already read. It's a temporary structure meant for one-time execution.
I think a more accurate description might be "temporary table". Though maybe that is a distinction without merit. We are first writing the batches to a temporary table and then playing back from that temporary table to execute the operation.
Another analogy could be SQL server's spool operator (an operator that uses a temporary table to store data and then that table will be read multiple times throughout the execution of a plan) which is used for a slightly different purpose (to share the output of a node with multiple readers) but is implemented in much the same way.
| let reader = AsyncStreamReader::open(spill_path.clone()).await?; | ||
| // Skip batches we've already read. | ||
| for _ in 0..self.batches_read { | ||
| reader.read().await?; | ||
| } | ||
| self.state = SpillReaderState::Reader { reader }; |
There was a problem hiding this comment.
Ok...so if we've read batches 0, 1, 2, 3, 4...
And then the writer decides to spill. It will still write batches 0, 1, 2, 3, 4?
This makes sense (as this will potentially need to be replayed multiple times) but it was confusing.
There was a problem hiding this comment.
I'll add a comment explaining that.
That's a fair point. I think I can rename the function |
Part of #3397
Backoffutility into separate struct.50ms, 100ms, 200ms, 400ms(previously started at 100ms)Transaction::conflicts_with()to return an enum that differentiates Retryable and non-retryable conflicts.merge_insertretry on retry-able conflicts up to 10 times.TooMuchContentionerror.background_iteratorso that it preservessize_hint().CommitConflictso it's easier to see which operations conflicted.