-
Notifications
You must be signed in to change notification settings - Fork 1.5k
PARQUET-2117: Expose Row Index via ParquetReader and ParquetRecordReader #945
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
8fbd061 to
37faea9
Compare
parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
Show resolved
Hide resolved
|
@shangxinli @gszadovszky Please review the changes when you get chance. Thanks! |
|
Can you squash the commits to make the review easier? |
37faea9 to
0fbc608
Compare
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
Outdated
Show resolved
Hide resolved
|
|
||
| @Override | ||
| public Optional<Long> getRowIndexOffset() { | ||
| return Optional.of(rowIndexOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the constructor caller cannot have a valid rowIndexOffset, I guess we need to provide an option to return empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
Outdated
Show resolved
Hide resolved
| * Returns the ROW_INDEX of the current row. | ||
| */ | ||
| public long getCurrentRowIndex() { | ||
| if (current == 0L) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason not turning -1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
current is an existing variable which tracks number of rows already processed. It is initialized to 0 at declaration time. So here we are trying to see if it is still 0, that means we haven't processed any row yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand why we are checking 'current == 0L'. I was asking why you choose throw exception other than returning an invalid value. This is a public method. We should have it documented ether way you choose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was no specific reason for choosing exception over -1.
I have updated it to return -1 and also updated all the public method docs to reflect the same.
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
Show resolved
Hide resolved
parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
Show resolved
Hide resolved
parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
Show resolved
Hide resolved
|
We need more test to cover old parquet data that doesn't have column index. |
|
@shangxinli Thanks a lot or the review. I have addressed most of the comments.
I couldn't find any existing tests or any existing parquet files in Resource directory which doesn't have column indexes. Could you please give some pointer to similar existing test or some way to create parquet file without column indexes (don't see any options to disable writing column indexes either)? |
9fe1bd6 to
470998c
Compare
|
I will have another look soon sometime this week. |
|
@shangxinli I have added test to cover old parquet file without column indexes. Please review the changes when you get chance. |
|
|
||
| public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, | ||
| InternalFileDecryptor fileDecryptor, boolean encryptedFooter) throws IOException { | ||
| return fromParquetMetadata(parquetMetadata, fileDecryptor, encryptedFooter, generateRowGroupOffsets(parquetMetadata)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you mentioned above, if parquetMetadata is a filtered one, then generateRowGroupOffsets() won't return accurate offsets, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes thats correct. Fixed this - now we are passing empty Map so that we don't populate incorrect rowIndexOffsets.
| private long totalByteSize; | ||
| private String path; | ||
| private int ordinal; | ||
| private long rowIndexOffset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the following toString(), it should be added too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
|
|
||
| try { | ||
| currentValue = recordReader.read(); | ||
| if (rowIdxInFileItr != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
&& hasNext()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| if (pages.getRowIndexes().isPresent()) { | ||
| rowIdxInRowGroupItr = pages.getRowIndexes().get(); | ||
| } else { | ||
| // If `pages.getRowIndexes()` is empty, this means column indexing has not triggered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of 'column index' was already used for Page Index in another feature. Can you use something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed this code comment.
|
@shangxinli Thanks a lot or the review. I have addressed the review comments. Could you please look into the PR again? Also could you share information about when are we planning to do code-freeze for next minor release? It will be great if we can release this change in next minor/patch release so that Apache Spark/other projects get to use this functionality sooner. |
@shangxinli Gentle reminder. Please take a look when you get chance. |
| } | ||
|
|
||
| /** | ||
| * Returns the row index of the last read row. If no row has been processed, returns -1. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given this is a public method, we need to take care of the Java doc decorations. Please refer to other methods in this class and follow the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
| private static final Path FILE_V1 = createTempFile(); | ||
| private static final Path FILE_V2 = createTempFile(); | ||
| private static final List<PhoneBookWriter.User> DATA = Collections.unmodifiableList(makeUsers(10000)); | ||
| private static final Path STATIC_FILE_WITHOUT_COL_INDEXES = createPathFromCP("/test-file-with-no-column-indexes-1.parquet"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if it is a good idea to check in a data file. Can you check if it is possible to stop generating offset index in the current version of Parquet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shangxinli It looks like the column-indexes are always written in the current version of parquet and are not configurable.
We are already testing the new row index support with and without the column index filtering being triggered (as part of TestColumnIndexFiltering). Also the new row index feature doesn't rely on column indexes in any way. So we can skip the backward compatibility testing and remove this parquet file from resources. What do you think about this?
|
I just left some comments. Other than that, it looks good to me. Add @ggershinsky in case you have time to have a look. Beyond this PR, if the work you are doing in Iceberg/Spark can be done in Parquet, please consider adding them to Parquet-mr. With that, it can benefit all the applications that need parquet-mr. |
|
hi guys, I'm OOO (vacation) this week. Can review it next week if helps, but feel free to go ahead without waiting for me. |
|
@shangxinli Thanks for taking another look. I have addressed all comments other than one. Please advice on the same. Thanks! |
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
Outdated
Show resolved
Hide resolved
f9cacb8 to
19caf21
Compare
|
thanks for this change. The PR looks good to me now, I'll add my approval after it passes the CI tests. |
|
@prakharjain09 After you fix the CI failures, we can merge. |
…eader, also expose the row index via ParquetReader or ParquetRecordReader - Add and populate rowIndexOffset field in BlockMetaData - Changes to generate row index in InternalParquetRecordReader, also expose the row index via ParquetReader or ParquetRecordReader - Add new unit tests and extend all the ColumnIndexFiltering and BloomFiltering unit tests to validate row indexes also.
…, document the same, Return -1 when rowIndexOffset info not available in BlockMetadata
19caf21 to
015f967
Compare
|
Thanks @ggershinsky for the review. I have addressed the comments and fixed the build issue. |
|
@shangxinli @ggershinsky Thanks a lot for reviewing this change. This will unblock SPARK-37980 if this is released as part of upcoming parquet release. Do we need to cherry-pick this to any release branch for the same? |
|
@prakharjain09 the upcoming parquet release will include the current master (plus a couple of WIP PRs, once they are merged), so this patch will be covered. |
|
@shangxinli @ggershinsky Is there any tentative date / rough estimate for when are we planning to do RC cut for the next release? |
|
@prakharjain09 hopefully, we'll resolve the remaining issues at the community sync tomorrow, and start working on a cut. |
Make sure you have checked all steps below.
Jira
Tests
Added TestParquetReader which covers rowIndex related tests for different kind of filters.
Also Extended all the ColumnIndexFiltering and BloomFiltering tests to validate the "row index" also. This adds unit test coverage for following scenarios for this feature: Parquet V1/V2 with encryption on/off with no-filter/simple-filter/column-index-filter/bloom-filter.
Commits
Documentation