-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-12288: [C++] Create Scanner interface #9947
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@ursabot please benchmark |
|
Benchmark runs are scheduled for baseline = 95ca4f5 and contender = bc1b8f0a7b6d36273ce815c288f59d96000a14cb. Results will be available as each benchmark for each run completes: |
|
Benchmark runs are scheduled for baseline = 95ca4f5 and contender = 09e132a0c6378d80a1fd33cb82266449420e2397. Results will be available as each benchmark for each run completes: |
09e132a to
90f86d2
Compare
|
Benchmark runs are scheduled for baseline = 6ddaaa8 and contender = 90f86d284b0640257f18910a26a95bbab9f18cf1. Results will be available as each benchmark for each run completes: |
fsaintjacques
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice cleanup.
cpp/src/arrow/dataset/scanner.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll backport this into #9589 since they need to be synced up at some point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about that. If you want to land #9589 first I can rebase this on top of that. I'm pretty sure the two are fairly compatible although there is some work to adjust to the new structs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've already backported it so whichever one gets reviewed first should get merged first :) (though I'm about to go back and remove operator== as per your point below.)
cpp/src/arrow/dataset/scanner.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this state pattern is common enough for iterators/async generators that I feel like we should encapsulate it instead of defining it ad-hoc every time. (Not for this PR, but as a future task.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already done for generators (#9945). It's a bit trickier here with the iterator here since it's enumerating both fragments and record batches at the same time. In the async version we enumerate them separately so it made more sense to have a general utility. Although this comment made me realize this should be named EnumeratingIterator and not TaggingIterator (I'm using "tagging" to refer to attaching the fragment to the record batch which is not what I'm doing here).
|
Should we actually merge this before ARROW-11797? Since that PR adds some more methods to the interface. Plus then we can kick things off in parallel. |
|
Benchmark runs are scheduled for baseline = 6ddaaa8 and contender = 451e238e4f05cbbae1f11cbb5a8dd2e49ff68267. Results will be available as each benchmark for each run completes: |
|
Benchmark runs are scheduled for baseline = 6ddaaa8 and contender = 4500bc1afbcd06312c65375b5b16ae7a1f4bc879. Results will be available as each benchmark for each run completes: |
4500bc1 to
f88a917
Compare
|
Benchmark runs are scheduled for baseline = c0ce2b1 and contender = f88a9172bb7ace47efc83a2eb53c895d7fa5d958. Results will be available as each benchmark for each run completes: |
|
This needs rebasing & I'll take a final look but otherwise I think we can get this at least into 4.0? And then we can hopefully also get the deprecation of Scan() in, which should set us up for 5.0.0. |
Agreed, I don't think we need to worry about deprecating this one. |
bkietz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments, thanks for doing this:
cpp/src/arrow/dataset/test_util.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
| void AssertBatchEquals(RecordBatchReader* expected, RecordBatch* batch) { | |
| void AssertBatchEquals(RecordBatchReader* expected, const RecordBatch& batch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Although why isn't RecordBatchReader passed by reference? Is it simply the fact that RecordBatch is const?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We generally favor pointers over non-const reference parameters, IIRC to make it clear what is used mutably, but I can't find the reference for this (I had thought it was in the Google C++ Style Guide).
cpp/src/arrow/dataset/file_base.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to remove parallel execution of batch partitioning. If that's intended then please call it out and add a comment with the follow up JIRA noted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had taken out the equivalent from #9589/ARROW-11797 but I can restore it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I rebased I restored the old version which relies on Scan (@lidavidm maybe the easiest thing to do is just add the pragma avoiding warning for deprecation. I'll add some kind of switch for the async version to use ScanBatches).
However, the old version created one scan task per fragment which relied on Scanner::GetFragments (which is actually the only reason I modified this function in this PR at all).
So when restoring the older version it just calls Scan and collects the iterator into a vector. My guess is the older version was a hangover from the older version of dataset write which didn't use the scanner? @bkietz do you mind thinking this through?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this wasn't purely to put the parallel partitioning back in. The ScanBatches implementation introduced the ever-annoying nested parallelism back in. I had avoided it in #9892 by converting synchronous scan task execution into a future (using TaskGroup::FinishAsync) and then combining that future with any asynchronous scan task executions. I bring this up because @lidavidm is probably going to run into the same problem with ToTable when rebasing ARROW-12208.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In ARROW-11797 I renamed Scan to ScanInternal and made it private for this purpose so I'll fix that once merged. (I think this can/should merge first since it's gone through review.)
…f warnings and ARROW-11797 is already dealing with that
…s and not directly from scanner
f88a917 to
9fbeb3c
Compare
|
Benchmark runs are scheduled for baseline = 745cdb6 and contender = 9fbeb3c. Results will be available as each benchmark for each run completes: |
|
Benchmark runs are scheduled for baseline = 745cdb6 and contender = 7c42b63. Results will be available as each benchmark for each run completes: |
|
Benchmark runs are scheduled for baseline = 745cdb6 and contender = 3690029. Results will be available as each benchmark for each run completes: |
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The two test failures look unrelated (MinIO failing to initialize on Windows and Maven encountering a network hiccup).
To prepare for the AsyncScanner this PR creates a Scanner interface and, along the way, simplifies the current Scanner API so that the new scanner won't need to match.
What is removed:
Scanner::GetFragmentswas only used inFileSystemDataset::Write. The correct source of truth for fragments is theDataset. Note: The python implementation exposed this method but it was not documented or used in any unit test. I think it can be safely removed and we need not worry about deprecation.Scanner::schemais redundant and ambiguous. There are two schemas at the scan level. The dataset schema (the unified master schema that we expect all fragment schemas to be a subset of) and the projection schema (a combination of the dataset schema and the projection expression). Both of these are available on the scan options object and there is an accessor for these options so the caller might as well get them from there. This schema function was exposed via R and used internally there but I think any uses can be easily changed to using the options.FileFormat::splittableandFragment::splittable. These were intended to advertise that batch readahead was available on the given fragment/format. However, there is no need to advertise this. They are not used by theSyncScannerand theAsyncScannerwill just assume that the format/fragment's will utilize readahead if they can (respecting the readahead options inScanOptions)Scanner. AllScannercreation should go throughScannerBuildernow. This allows theScannerBuilderto determine what implementation to use. This was mostly the way things were implemented already. Only a few tests instantiated aScannerdirectly.What is deprecated
Scanner::Scanis going to be deprecated (ARROW-11797). It will not be implemented byAsyncScanner. I do not actually deprecate it in this PR as I reserve that for ARROW-11797. Unfortunately, this method was exposed via python & R and likely was used so deprecation is recommended over outright removal.What is new
Scanner::ScanBatchesandScanner::ScanBatchesUnorderedhave been added. These functions will be the new preferred "scan" method going forward. This allows the parallelization (batch readahead, file readahead, etc.) to be handled by C++ and simplifies the user's life.ScanOptions::batch_readaheadandScanOptions::fragment_readaheadoptions allow more fine grained control over how to perform readahead. One technicality is that these options will not be respected well by theSyncScanner(although I think the current ARROW-11797 utilizes batch readahead) so they are more placeholders for when we implementAsyncScanner.ScanOptions::cpu_executorandScanOptions::io_contextare added and should be fairly self explanatory.ScanOptions::use_asyncwill toggle which scanner to use.