-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-11889: [C++] Add parallelism to streaming CSV reader #10568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-11889: [C++] Add parallelism to streaming CSV reader #10568
Conversation
acde4e5 to
7cd6aed
Compare
|
@pitrou @n3world This should probably wait until after ARROW-12996 is merged in. I think it'd be easier to rebase ARROW-12996 into this PR than the other way around. |
8b8c812 to
9d8dc07
Compare
|
I've rebased in the changes from #10509. The behavior is only slightly different. Opening the streaming CSV reader reads in the first record batch so the bytes_read will reflect that before any batch is read. After that each time a batch is read in the next batch will be read in. This means the read will not increment bytes_read. If reading in parallel then bytes_read could potentially be even further ahead of the consumer since it will be doing decoding in readahead. It should still match the spirit of the feature which is to report how many bytes have been decoded. @n3world @pitrou review is welcome. The CI failure is unrelated. |
cpp/src/arrow/csv/reader_test.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say this changes the intent I had for bytes_read() when threads are used. The goal was to be able to report progress along with the batch. So that after a batch was retrieved with ReadNext() bytes_read() could be used to calculate the progress of this batch. In this example the second to last batch would be calculated as 100% complete and this can become more skewed with more read ahead a parallel processing. However with the futures you never know when the record batch is retrieved from the future making it impossible for bytes_read() to work that way.
My only thought on how to solve this would be to have ReadNextAsync() or a new similar method return a Future on a pair where one of the values was the bytes read so that anybody who actually wants to associate progress with a batch will just use that API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the increment of decoded_bytes_ to be after the readahead. So now...
- bytes_decoded_ will not be incremented until the reader asks for the batch
- The header bytes and skip before header are still marked read after Make (I think this is fair as they have been "consumed" by this point)
- Bytes skipped after the header are marked consumed after the first batch is delivered
I think this is close enough to what you are after.
python/pyarrow/tests/test_csv.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still to test SerialStreamingCSV? Should there be two classes so that all test get run for serial and non serial?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good point (and clumsy of me to leave those comments in there). I've changed up the test so now there is no base class and every test is parameterized on use_threads=True/False.
86b9649 to
cd899de
Compare
|
Thanks for the feedback @n3world. I think I was able to update |
|
@ursabot please benchmark |
|
Benchmark runs are scheduled for baseline = cf6a7ff and contender = cd899de2debcd7ccb0c8d1e3f7840a3cebf77742. Results will be available as each benchmark for each run completes. |
cd899de to
6d6505a
Compare
bkietz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few comments. Overall this is a nice clarification of the streaming reader
…ed implementation. The parser and decoder are now operator functions and sequencing logic has been removed from them. Parallel readahead has been added to the streaming reader to allow for parallel streaming CSV reads.
…e commented out column decoder tests
…ad a reference to self which was causing a circular reference. Moved the reference to bytes_decoded itself.
Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
6347b70 to
ab9b932
Compare
bkietz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for doing this!
This converts the parser & decoder into map functions and then creates the streaming CSV reader as an async generator. Parallel readahead is then added on top of the parser/decoder to allow for parallel reads.
One thing that is lost at the moment is the ability to encounter a parsing error and then continue. There was a python test that read in the first block, failed to convert the second block, and then successfully read in a third block. I'm not sure if that restart behavior is important but if it is I can look into adding it.
Another thing that could be investigated in the future is combining the file readers and table readers more. They already share some components but the parsing and decoding logic, while basically the same, is handled very differently. The only real difference is that the table reader saves all the parsed blocks for re-parsing and the streaming reader does not.