-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-17599: [C++] Change the way how arrow reads parquet buffered files #14226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I see what you are after. I'm not entirely certain if it works or not. I had kind of thought the extra calls to PreBuffer would happen inside the generator.
cpp/src/parquet/arrow/reader.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we are calling PreBuffer multiple times. Each time we call it we haven't yet finished reading from the time before. Will this work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right Weston, doesn't work.
It seems we need to wait to finish the prebuffering process (for each parquet row group) before we read each row group; I sent new changes, the relevant part about this is in RowGroupGenerator::FetchNext (there is a future::wait there)
cpp/src/parquet/arrow/reader.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a bit over-complicated. Can we push the extra calls to PreBuffer down a layer into the RowGroupGenerator itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have a test case where we measure that we don't actually keep the RAM when reading the entire file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Weston, do you have an idea about how to make such test? I think we will still need to keep data in RAM right? given that ReadRangeCache::read is retaining the contents of the buffered data until the reader expires.
Do you think is a good idea to use TestArrowReadWrite.GetRecordBatchGenerator as base for writing the new test you are requesting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
disregard this please, I'll try to use the memory pool for getting the memory stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a new test called TestArrowReadWrite.ReadShouldNotRetainRam
f74bf85 to
f7a07ba
Compare
|
I also experimented with the python script provided in this related/duplicated Jira ticket: https://issues.apache.org/jira/browse/ARROW-17590 ================================ 0 rss: 88.390625 MB
1 rss: 1374.640625 MB
pa.total_allocated_bytes 43.61480712890625 MB dt.nbytes 0.0014410018920898438 MB
c0 c1 c2 c3 c4 c5 c6 c7 c8 c9 c10 c11 c12 c13 c14 c15 c16 c17 c18 c19 c20 c21 c22 c23 c24 c25 c26 c27 ... c572 c573 c574 c575 c576 c577 c578 c579 c580 c581 c582 c583 c584 c585 c586 c587 c588 c589 c590 c591 c592 c593 c594 c595 c596 c597 c598 c599
0 125000 ... None None None None None None None None None None None None None None None None None None None None None None None None None None None None
[1 rows x 600 columns]
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Columns: 600 entries, c0 to c599
dtypes: object(600)
memory usage: 23.9 KB
2 rss: 1294.765625 MB
3 rss: 1294.765625 MB
pyarrow 10.0.0.dev4070+gfb087669a.d20221013 pandas 1.5.0 numpy 1.23.3================================ 0 rss: 87.5 MB
1 rss: 728.921875 MB
pa.total_allocated_bytes 9.8636474609375 MB dt.nbytes 0.0014410018920898438 MB
c0 c1 c2 c3 c4 c5 c6 c7 c8 c9 c10 c11 c12 c13 c14 c15 c16 c17 c18 c19 c20 c21 c22 c23 c24 c25 c26 c27 ... c572 c573 c574 c575 c576 c577 c578 c579 c580 c581 c582 c583 c584 c585 c586 c587 c588 c589 c590 c591 c592 c593 c594 c595 c596 c597 c598 c599
0 125000 ... None None None None None None None None None None None None None None None None None None None None None None None None None None None None
[1 rows x 600 columns]
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Columns: 600 entries, c0 to c599
dtypes: object(600)
memory usage: 23.9 KB
2 rss: 731.375 MB
3 rss: 731.375 MB
pyarrow 10.0.0.dev4070+gfb087669a.d20221013 pandas 1.5.0 numpy 1.23.3================================ 0 rss: 87.703125 MB
1 rss: 729.5 MB
pa.total_allocated_bytes 9.8636474609375 MB dt.nbytes 0.0014410018920898438 MB
c0 c1 c2 c3 c4 c5 c6 c7 c8 c9 c10 c11 c12 c13 c14 c15 c16 c17 c18 c19 c20 c21 c22 c23 c24 c25 c26 c27 ... c572 c573 c574 c575 c576 c577 c578 c579 c580 c581 c582 c583 c584 c585 c586 c587 c588 c589 c590 c591 c592 c593 c594 c595 c596 c597 c598 c599
0 125000 ... None None None None None None None None None None None None None None None None None None None None None None None None None None None None
[1 rows x 600 columns]
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Columns: 600 entries, c0 to c599
dtypes: object(600)
memory usage: 23.9 KB
2 rss: 610.328125 MB
3 rss: 610.328125 MB
pyarrow 10.0.0.dev4070+gfb087669a.d20221013 pandas 1.5.0 numpy 1.23.3================================ 0 rss: 87.484375 MB
1 rss: 729.859375 MB
pa.total_allocated_bytes 9.8636474609375 MB dt.nbytes 0.0014410018920898438 MB
c0 c1 c2 c3 c4 c5 c6 c7 c8 c9 c10 c11 c12 c13 c14 c15 c16 c17 c18 c19 c20 c21 c22 c23 c24 c25 c26 c27 ... c572 c573 c574 c575 c576 c577 c578 c579 c580 c581 c582 c583 c584 c585 c586 c587 c588 c589 c590 c591 c592 c593 c594 c595 c596 c597 c598 c599
0 125000 ... None None None None None None None None None None None None None None None None None None None None None None None None None None None None
[1 rows x 600 columns]
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Columns: 600 entries, c0 to c599
dtypes: object(600)
memory usage: 23.9 KB
2 rss: 732.34375 MB
3 rss: 732.34375 MB
pyarrow 10.0.0.dev4072+gc32f988f5.d20221014 pandas 1.5.0 numpy 1.23.3================================ 0 rss: 87.828125 MB
1 rss: 1385.734375 MB
pa.total_allocated_bytes 9.7957763671875 MB dt.nbytes 0.0014410018920898438 MB
c0 c1 c2 c3 c4 c5 c6 c7 c8 c9 c10 c11 c12 c13 c14 c15 c16 c17 c18 c19 c20 c21 c22 c23 c24 c25 c26 c27 ... c572 c573 c574 c575 c576 c577 c578 c579 c580 c581 c582 c583 c584 c585 c586 c587 c588 c589 c590 c591 c592 c593 c594 c595 c596 c597 c598 c599
0 125000 ... None None None None None None None None None None None None None None None None None None None None None None None None None None None None
[1 rows x 600 columns]
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Columns: 600 entries, c0 to c599
dtypes: object(600)
memory usage: 23.9 KB
2 rss: 1538.9375 MB
3 rss: 1546.4375 MB
pyarrow 10.0.0.dev4070+gfb087669a.d20221013 pandas 1.5.0 numpy 1.23.3================================ 0 rss: 87.8125 MB
1 rss: 1431.546875 MB
pa.total_allocated_bytes 9.7957763671875 MB dt.nbytes 0.0014410018920898438 MB
c0 c1 c2 c3 c4 c5 c6 c7 c8 c9 c10 c11 c12 c13 c14 c15 c16 c17 c18 c19 c20 c21 c22 c23 c24 c25 c26 c27 ... c572 c573 c574 c575 c576 c577 c578 c579 c580 c581 c582 c583 c584 c585 c586 c587 c588 c589 c590 c591 c592 c593 c594 c595 c596 c597 c598 c599
0 125000 ... None None None None None None None None None None None None None None None None None None None None None None None None None None None None
[1 rows x 600 columns]
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Columns: 600 entries, c0 to c599
dtypes: object(600)
memory usage: 23.9 KB
2 rss: 1570.390625 MB
3 rss: 1573.8125 MB
pyarrow 10.0.0.dev4070+gfb087669a.d20221013 pandas 1.5.0 numpy 1.23.3 |
4a9780d to
25b12ce
Compare
|
Follow up ticket for the IPC file reader: |
cpp/src/parquet/arrow/reader.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, why are we synchronously blocking, then attaching a callback to the future in the next step? Something seems off here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks David, I got this error when I don't use wait:
ReadRangeCache did not find matching cache entry
It's like the parquet_reader()->WhenBuffered call is not really waiting to have the buffer ready before read the rowgroup.
I'll investigate more what could be the issue here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems -somehow- when for the same rowgroup we trigger ReadOneRowGroup and it starts reading, the cache entries are empty (even when I could confirm that it called PreBuffer before)
I did a simple change: when we don't transfer the future using
if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
it works without wait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that when we transfer the future, the PreBuffer that it's running now (in a different thread) cannot finish before the future triggers ReadOneRowGroup (because the future was already transferred into a different thread)
So perhaps we just need a way to make sure PreBuffer gets called before in the same thread of the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I sent new changes to avoid the use of Future::Wait (in the main thread) and to run the prebuffering and read operations together in the same thread context (one after the other).
25b12ce to
e8186e9
Compare
move PreBuffer down into the RowGroupGenerator and draft the new test Don't concatenate RecordBatchGenerator and add more ideas for the test Avoid keeping the RAM when using the RecordBatchGenerator: Use MakeMappedGenerator instead of MakeConcatenatedGenerator to create the asyncgenerator. cleaning ... format concatenate the async generator when we are using prebuffering format (I forgot to update archery linters to clang-tools 14) prefer concatenation instead to not break parquet pytest, fix some issues using arrow::Future force wait the buffering finish before read (all of this is async)
e8186e9 to
2548cab
Compare
|
There are some CI errors that seems unrelated and also the CI job for macOS 11 was canceled (not sure why). debug/arrow-dataset-file-parquet-test.exe
[----------] Global test environment tear-down
[==========] 44 tests from 3 test suites ran. (3093 ms total)
[ PASSED ] 44 tests.
debug/parquet-reader-test.exe
[----------] Global test environment tear-down
[==========] 76 tests from 26 test suites ran. (518 ms total)
[ PASSED ] 71 tests.
[ SKIPPED ] 1 test, listed below:
[ SKIPPED ] TestDumpWithLocalFile.DumpOutput
[ FAILED ] 4 tests, listed below:
[ FAILED ] TestBooleanRLE.TestBooleanScanner
[ FAILED ] TestBooleanRLE.TestBatchRead
[ FAILED ] TestTextDeltaLengthByteArray.TestTextScanner
[ FAILED ] TestTextDeltaLengthByteArray.TestBatchRead
4 FAILED TESTS
debug/parquet-arrow-internals-test.exe
[----------] Global test environment tear-down
[==========] 78 tests from 2 test suites ran. (76 ms total)
[ PASSED ] 78 tests.
debug/arrow-io-buffered-test.exe
[----------] Global test environment tear-down
[==========] 22 tests from 3 test suites ran. (137 ms total)
[ PASSED ] 22 tests. |
|
I sent a minor change and I noticed again that it seems the CI job for windows msys2-mingw64 is not stable enough (it could not even start building): |
|
I built on windows again, this time using msys2-mingw32 toolchain and I was able to run the unit tests without issues. |
| END_PARQUET_CATCH_EXCEPTIONS | ||
| auto wait_buffer = | ||
| reader->parquet_reader()->WhenBuffered({row_group}, column_indices); | ||
| wait_buffer.Wait(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm…I think anything calling Wait() in a callback/async context is not going to be right.
I think the issue is that the pre-buffer code doesn't handle concurrent use. The Wait() is effectively just working around that by blocking the thread so that there's no sharing. However, if you attach a reentrant readahead generator to it, I'd guess it'd still fail. So I think either the internals should be refactored so that it does handle concurrent use, or we should just create a separate ReadRangeCache per row group. (The advantage of that is that you'd have a harder bound on memory usage.)
However either way this loses 'nice' properties of the original, buffer-entire-file approach (e.g. small row groups can get combined together for I/O). IMO, the longer term solution would be to disentangle the 'cache' and 'coalesce' behaviors (and possibly even remove the 'cache' behavior, which may make more sense as a wrapper over RandomAccessFile?) and try the approach proposed in the original JIRA, which would be to coalesce ranges, then track when ranges are actually read and remove the buffer from the coalescer once all ranges mapping to a given buffer are read. (The buffer may be kept alive downstream due to shared usage, though.) Or maybe that's still overly fancy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks David, I'll close this PR in favor of https://issues.apache.org/jira/browse/ARROW-18113
|
Closing this PR in favor of https://issues.apache.org/jira/browse/ARROW-18113 |
Jira ticket: https://issues.apache.org/jira/browse/ARROW-17599
Given that the API of
ReadRangeCache::readis retaining the buffer handlers until the end of the file reader, we need to change the way how the parquet reader reads buffered data, this is a potential solution to avoid loading all the row groups in memory.There are historical reasons for the current design of
ReadRangeCache::read, this PR will not change that API, instead, this PR will change the way how we are using the pre buffering process for reading parquet files (there will be a similar PR later to change the behavior of the IPC reader as well)Additionally, this PR will add:
ReadRangeCache::readto make sureReadRangeCacheis retaining the memory.Update the API doc for theReadRangeCache::readto indicate that the buffer data is outliving until the end of the file reader's scope/life.FileReaderImpl::GetRecordBatchGenerator