-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-7001: [C++] Develop threading APIs to accommodate nested parallelism #9607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
a875631 to
9044db2
Compare
ce01a15 to
7f525e8
Compare
7f525e8 to
df34e3a
Compare
e399e7d to
8a30438
Compare
4b51568 to
b1cfa66
Compare
5082cab to
4581b6a
Compare
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Frankly I probably need to give this a second pass but overall this looks reasonable. I left mostly minor comments.
cpp/src/arrow/dataset/dataset.h
Outdated
|
|
||
| protected: | ||
| Result<FragmentIterator> GetFragmentsImpl(Expression predicate) override; | ||
| Future<FragmentVector> GetFragmentsImpl(Expression predicate) override; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be suboptimal for the case of writing datasets. ARROW-10882/#9802 backs an InMemoryDataset with a RecordBatchReader to support passing in Python generators; if we have to materialize all fragments up front that would defeat the purpose.
Though, instead, we could implement a lazy InMemoryFragment instead of the current approach which materializes an InMemoryFragment for each batch in the source, so this doesn't block this PR per se (we just need to rework the other PR a bit once this lands).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to let this be addressed by ARROW-8163.
However, if we're getting ARROW-10882 done first then rather than go with an InMemoryFragment approach I'd recommend switching to AsyncGenerator at that point since we're going to do so sooner or later anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, it hopefully won't be too much work. The scanner already consumes the FragmentVector as a generator so the only change needed would be to push the MakeVectorGenerator down into the file formats.
35903de to
857f349
Compare
ARROW-7001: First stab at converting datasets logic to async ARROW-7001: Fixed a bunch of .result()'s in unit tests that weren't really valid (returning a reference to something then deleted) ARROW-7001: Missed on change during rebase ARROW-7001: Renamed ScanSync to Scan and ExecuteSync to Execute to preserve the old mirror APIs until the public bindings can be removed ARROW-7001: Added a few more mirror APIs to get python build working ARROW-7001: WIP ARROW-7001: Various WIP ARROW-7001: WIP ARROW-7001: First stab at converting datasets logic to async ARROW-7001: Fixed a bunch of .result()'s in unit tests that weren't really valid (returning a reference to something then deleted) ARROW-7001: Renamed ScanSync to Scan and ExecuteSync to Execute to preserve the old mirror APIs until the public bindings can be removed ARROW-7001: Added a few more mirror APIs to get python build working ARROW-7001: WIP ARROW-7001: Various WIP ARROW-7001: Minor fixes to get semantics right ARROW-7001: Cleanup ARROW-7001: Fixing some compile errors after rebase ARROW-7001: Fixing errors from rebase ARROW-7001: Added a test for reordering datasets. Removed old concept of splittable. Fixed bug where file errors may not pass through ARROW-7001: Somewhere in the rebasing I lost the 1-arg ScannerBuilder constructor. Added it back in and created a unit test for it for good measure. ARROW-7001: Removing a ... to see if it removes illegal instruction on mac ARROW-7001: Fixed a potential memory issue in the preserve ordering test ARROW-7001: lint ARROW-7001: Changed from using optional<bool> which isn't allowed to just returning the scan task in Scanner::ToTableAsync::table_building_task ARROW-7001: Removed the forced transfer as it was not truly doing anything ARROW-7001: The CSV scan task was doing a read on the CPU thread pool and it was preventing the async chain from getting setup immediately slowing things down. In addition, the later readahead buffers need to be larger to prevent the CPU thread from idling when things arrive out of order. ARROW-7001: Need to put the impl for Scanner::ToTable in the cc file so it ends up in the so ARROW-7001: Added a reordering test ARROW-7001: Added ordering to scanner ARROW-7001: Converted Future<Generator> to Generator ARROW-7001: File readahead was not working correctly and to fix it required quite an overhaul of the scanner but, on the bright side, performance is better on I/O bound tasks ARROW-7001: Fix failing unit test ARROW-7001: Cleaned up lint. Deprecated the old Scan method. Reworked existing logic to adapt ARROW-7001: Removing unused code detected by build ARROW-7001: Moved some code around between header/impl to make MSVC happy. Fixed up a memory leak in a unit test caused by a circular shared_ptr reference
…R code. To address in ARROW-11782 ARROW-7001: Removed incorrect comment from MakeMappedGenerator ARROW-7001: Fixed a regression present when reading IPC fully buffered in memory ARROW-7001: Made the InMemoryDataset creation methods consistent. ARROW-7001: Adding back in (hopefully legacy) constructor for InMemoryScanTask needed by cglib
4d69e88 to
5d48227
Compare
|
@ursabot please benchmark |
|
Benchmark runs are scheduled for baseline = 5554c54 and contender = 5d48227. Results will be available as each benchmark for each run completes: |
|
The work reflected in this PR has been captured in ARROW-12289 (and related JIRAs). This PR is no longer needed. |
This PR ports the dataset/scanner logic to async. It does not actually make any readers (e.g. parquet reader, ipc reader) async. Once the readers are switched to async then they can make use of nested parallelism.
This PR should improve performance on CSV in high latency situations (and the other readers once they are converted).
The scanning order and the scan tasks delivered by
Scanner::Scanwill change as part of this PR (just to be clear, the order in which the files are scanned will change, not the order in which scan tasks are delivered. However, the scan tasks themselves will change, see next note).This PR makes
Scanner::Scana legacy operation. In order to keep backwards compatibility we still return scan tasks but they are in-memory scan tasks around record batches. This means that parquet will generate more scan tasks than it did before (one per record batch instead of one per row group). CSV and IPC will generate many more scan tasks than before. However, the Execute method should be immediate for these new scan tasks. The new, preferred approach, is to useScanner::ScanBatches.This PR also adds
Scanner::ScanAsync,Scanner::ScanUnorderedAsync, andScanner::ToTableAsync. However, there is no need to externally expose those yet.