-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-12289: [C++] Create basic AsyncScanner implementation #10008
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ARROW-11797 uses the param to toggle UseThreads, so this will have to become a std::pair<bool, bool> (or really, just a custom struct) in the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yes. I was planning on adding whether to scan with Scan (to ensure we still test the legacy), ScanBatches, or ScanBatchesUnordered as a parameter as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll tackle that when I rebase ARROW-11797
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've absorbed UseThreads into the matrix.
dd525b3 to
a040abf
Compare
cpp/src/arrow/dataset/file_base.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to check again if NextSync/NextAsync return the end marker? Otherwise, operator() will return a Future that resolves to the end marker and the consumer will stop early.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a silent precondition here that every fragment scan should return scan tasks that return at least 1 record batch (unless the entire fragment is empty in which case either 0 scan tasks or 1 scan task with 0 batches should both be ok).
I'm know this precondition holds for IPC and CSV (by virtue of there being only one scan task) but wasn't sure about parquet (i.e. can a push down filter cause a batch-less scan task to be emitted in the middle of a set of scan tasks?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be reading this wrong, but when we finish one scan task and move on to the next, since we just fall through here, we'll return a completed Future which contains a nullptr, which gets returned as the result of operator(). So the generator's consumer will think that the generator has ended, even though we still have more scan tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. I think you are right. I should probably add a scanner unit test that generates more than one scan task. I'll work on that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that there's now a parameter to generate multiple scan tasks per fragment in InMemoryDataset - however, is that necessary? For one, it doesn't affect this code path, since this only affects file fragments. For another, it doesn't affect the scanner, which doesn't use scan tasks (directly); it'll use ScanBatchesAsync on the Fragment, which flattens all the scan tasks itself anyways.
So I think the issue pointed out here doesn't show up in test purely because only Parquet fragments expose multiple scan tasks per fragment right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point. I think it's still necessary but could be renamed as it is a bit vague. In the async case it is "batches per fragment" and in the sync case it is "scan tasks per fragment". It was enough to break the async scanner (it currently fails these tests). I also agree it doesn't expose this issue so I'll some more tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, technically speaking the # of batches per fragment depends both on this and max batch size. So I suppose we could have gotten sufficient testing by setting the max batch size small enough. At the moment these tests help exercise the SyncScanner if nothing else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good point. It is mostly just a nit as it's really a testing parameter that's unfortunately getting exposed in the public API.
This isn't a very strong precedent, but TableBatchReader handles batch_size by letting you set it after construction and that feels like an analogue of this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once SyncScanner goes away we could probably change InMemoryFragment::record_batches_ to InMemoryFragment::record_batch_. This reflects the spirit of "getting rid of scan tasks" better anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I capitulated and removed the argument. Your comment about it just being a testing parameter is accurate. I created a test in arrow-dataset-file-test that does not rely on InMemoryDataset to test this logic here. I might in the future add some tests to ScannerTest that set a limit on scan options batch size to get coverage of the multiple batches per fragment case.
cpp/src/arrow/dataset/scanner.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be MakeMergedGenerator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will need to be. The problem is that MakeMergedGenerator is immediately consuming EnumeratingGenerator which is not async-reentrant. MakeMergedGenerator (erroneously) pulls from the outer (the gen_gen) generator in an async-reentrant fashion. I'll make a follow-up JIRA just to keep this one simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
7c93e1e to
d372f45
Compare
|
After this lands I can rebase and implement ScanBatchesAsync for IPC/Parquet and give that another test. |
…. This ends up creating multiple scan tasks per fragment for the sync case.
d4669c1 to
42ba11f
Compare
|
I rebased @lidavidm 's latest changes. At this point I think I don't think there are anymore outstanding dataset PRs to rebase so I think this one is probably ready to merge if it passes review. |
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is ready, just a couple more minor things.
cpp/src/arrow/dataset/scanner.h
Outdated
| constexpr int32_t kDefaultBatchReadahead = 32; | ||
| constexpr int32_t kDefaultFragmentReadahead = 8; | ||
|
|
||
| using FragmentGenerator = std::function<Future<std::shared_ptr<Fragment>>()>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think moving the subclass definitions means you can also move this alias and get rid of the async_generator.h include.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to move FragmentGenerator but async_generator.h was still needed for Enumerated which is a pity since Enumerated is small and self-contained. Should I place it in its own file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that's alright then.
…an executor I don't think it's really something we want the user specifying after all
|
I'll merge on green unless somebody else wants to take a look. |
|
Integration build: usual issues |
Adds a naive implementation of
AsyncScannerwhich is different fromSyncScannerin a few ways:ScanTaskand instead relies onFragment::ScanBatchesAsyncwhich returnsRecordBatchGenerator.ToTable.It is "naive" because this PR does not add a complete implementation for
FileFragment::ScanBatchesAsync. This method relies onFileFormat::ScanBatchesAsync(in the same way thatFileFragment::Scanrelies onFileFormat::ScanFile). This method (FileFormat::ScanBatchesAsync) should be overridden in each of the formats (to rely on an async reader) but it is not (yet).As a result, the performance for
AsyncScanneris poor since it does not do any "per-file" parallelism nor does it do any "per-batch" parallelism. Follow-up tasks are ARROW-12355 (CSV), ARROW-11772 (IPC), ARROW-11843 (Parquet)In addition, this PR is built on top of ARROW-12287 so that will need to be merged first. It will also need to rebase changes from ARROW-12161 and ARROW-11797.