From 6577dc830307d29369b92fee50527e8161b314e1 Mon Sep 17 00:00:00 2001 From: David Zhu Date: Wed, 29 Jan 2025 14:07:06 -0800 Subject: [PATCH 1/6] Add a constructor to allow passing in footer for InputFiles --- .../parquet/hadoop/ParquetFileReader.java | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index ae2de87cbc..4683038119 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -729,6 +729,21 @@ public static ParquetFileReader open(InputFile file, ParquetReadOptions options, return new ParquetFileReader(file, options, f); } + /** + * Open a {@link InputFile file} with {@link ParquetMetadata footer} and {@link ParquetReadOptions options}. + * + * @param file an input file + * @param footer a {@link ParquetMetadata} footer already read from the file + * @param options parquet read options + * @param f the input stream for the file + * @return an open ParquetFileReader + * @throws IOException if there is an error while opening the file + */ + public static ParquetFileReader open(InputFile file, ParquetMetadata footer, ParquetReadOptions options, SeekableInputStream f) + throws IOException { + return new ParquetFileReader(file, footer, options, f); + } + protected SeekableInputStream f; private final InputFile file; private final ParquetReadOptions options; @@ -930,6 +945,12 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOEx this(file, options, file.newStream()); } + /** + * @param file Path to a parquet file + * @param options {@link ParquetReadOptions} + * @param f a {@link SeekableInputStream} for the parquet file + * @throws IOException if the file can not be opened + */ public ParquetFileReader(InputFile file, ParquetReadOptions options, SeekableInputStream f) throws IOException { this.converter = new ParquetMetadataConverter(options); this.file = file; @@ -943,6 +964,51 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options, SeekableInp f.close(); throw e; } + + this.fileMetaData = footer.getFileMetaData(); + this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups! + if (null != fileDecryptor && fileDecryptor.plaintextFile()) { + this.fileDecryptor = null; // Plaintext file. No need in decryptor + } + + try { + this.blocks = filterRowGroups(footer.getBlocks()); + } catch (Exception e) { + // In case that filterRowGroups throws an exception in the constructor, the new stream + // should be closed. Otherwise, there's no way to close this outside. + f.close(); + throw e; + } + this.blockIndexStores = listWithNulls(this.blocks.size()); + this.blockRowRanges = listWithNulls(this.blocks.size()); + for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { + paths.put(ColumnPath.get(col.getPath()), col); + } + + if (options.usePageChecksumVerification()) { + this.crc = new CRC32(); + this.crcAllocator = ReusingByteBufferAllocator.strict(options.getAllocator()); + } else { + this.crc = null; + this.crcAllocator = null; + } + } + + /** + * @param file Path to a parquet file + * @param footer a {@link ParquetMetadata} footer already read from the file + * @param options {@link ParquetReadOptions} + * @param f a {@link SeekableInputStream} for the parquet file + * @throws IOException if the file can not be opened + */ + public ParquetFileReader(InputFile file, ParquetMetadata footer, ParquetReadOptions options, SeekableInputStream f) + throws IOException { + this.converter = new ParquetMetadataConverter(options); + this.file = file; + this.f = f; + this.options = options; + this.footer = footer; + this.fileMetaData = footer.getFileMetaData(); this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups! if (null != fileDecryptor && fileDecryptor.plaintextFile()) { From 0ea55b7599c667c90d4fb0512ced08e70f603fb0 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Tue, 5 Aug 2025 21:02:07 +0800 Subject: [PATCH 2/6] Fix code style --- .../main/java/org/apache/parquet/hadoop/ParquetFileReader.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 4683038119..a6618d7b3a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -739,7 +739,8 @@ public static ParquetFileReader open(InputFile file, ParquetReadOptions options, * @return an open ParquetFileReader * @throws IOException if there is an error while opening the file */ - public static ParquetFileReader open(InputFile file, ParquetMetadata footer, ParquetReadOptions options, SeekableInputStream f) + public static ParquetFileReader open( + InputFile file, ParquetMetadata footer, ParquetReadOptions options, SeekableInputStream f) throws IOException { return new ParquetFileReader(file, footer, options, f); } From 303822092677b440ae12eaf7f9d5e9e30e967c6e Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 6 Aug 2025 21:46:35 +0800 Subject: [PATCH 3/6] setRequestedSchema --- .../java/org/apache/parquet/hadoop/ParquetFileReader.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index a6618d7b3a..775053bf88 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -1121,13 +1121,17 @@ public List getRowGroups() { return blocks; } - public void setRequestedSchema(MessageType projection) { + public void setRequestedSchema(List columns) { paths.clear(); - for (ColumnDescriptor col : projection.getColumns()) { + for (ColumnDescriptor col : columns) { paths.put(ColumnPath.get(col.getPath()), col); } } + public void setRequestedSchema(MessageType projection) { + setRequestedSchema(projection.getColumns()); + } + public void appendTo(ParquetFileWriter writer) throws IOException { writer.appendRowGroups(f, blocks, true); } From 82daf6886acd8ca915d85e7c7ffc8c6620063c14 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 6 Aug 2025 21:46:40 +0800 Subject: [PATCH 4/6] UT --- .../apache/parquet/hadoop/TestDataPageChecksums.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java index 79a81a5e95..013498c2b4 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java @@ -36,6 +36,8 @@ import java.util.zip.CRC32; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.bytes.TrackingByteBufferAllocator; @@ -688,8 +690,13 @@ private int getDataOffset(Page page) { */ private ParquetFileReader getParquetFileReader(Path path, Configuration conf, List columns) throws IOException { - ParquetMetadata footer = ParquetFileReader.readFooter(conf, path); - return new ParquetFileReader(conf, footer.getFileMetaData(), path, footer.getBlocks(), columns); + HadoopInputFile inputFile = HadoopInputFile.fromPath(path, conf); + SeekableInputStream inputStream = inputFile.newStream(); + ParquetReadOptions readOptions = HadoopReadOptions.builder(conf).build(); + ParquetMetadata footer = ParquetFileReader.readFooter(inputFile, readOptions, inputStream); + ParquetFileReader reader = ParquetFileReader.open(inputFile, footer, readOptions, inputStream); + reader.setRequestedSchema(columns); + return reader; } /** From 9242b3ed2abd716f2058c6a6bd17a941087cc582 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 22 Aug 2025 10:58:48 +0800 Subject: [PATCH 5/6] reduce code duplication --- .../parquet/hadoop/ParquetFileReader.java | 58 ++++++------------- 1 file changed, 17 insertions(+), 41 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 775053bf88..4c148dd796 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -567,8 +567,23 @@ public static final ParquetMetadata readFooter(InputFile file, MetadataFilter fi public static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f) throws IOException { + return readFooter(file, options, f, /*closeStreamOnFailure*/ false); + } + + public static final ParquetMetadata readFooter( + InputFile file, ParquetReadOptions options, SeekableInputStream f, boolean closeStreamOnFailure) + throws IOException { ParquetMetadataConverter converter = new ParquetMetadataConverter(options); - return readFooter(file, options, f, converter); + try { + return readFooter(file, options, f, converter); + } catch (Exception e) { + // In case that readFooter throws an exception in the constructor, the new stream + // should be closed. Otherwise, there's no way to close this outside. + if (closeStreamOnFailure) { + f.close(); + } + throw e; + } } private static final ParquetMetadata readFooter( @@ -953,46 +968,7 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOEx * @throws IOException if the file can not be opened */ public ParquetFileReader(InputFile file, ParquetReadOptions options, SeekableInputStream f) throws IOException { - this.converter = new ParquetMetadataConverter(options); - this.file = file; - this.f = f; - this.options = options; - try { - this.footer = readFooter(file, options, f, converter); - } catch (Exception e) { - // In case that reading footer throws an exception in the constructor, the new stream - // should be closed. Otherwise, there's no way to close this outside. - f.close(); - throw e; - } - - this.fileMetaData = footer.getFileMetaData(); - this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups! - if (null != fileDecryptor && fileDecryptor.plaintextFile()) { - this.fileDecryptor = null; // Plaintext file. No need in decryptor - } - - try { - this.blocks = filterRowGroups(footer.getBlocks()); - } catch (Exception e) { - // In case that filterRowGroups throws an exception in the constructor, the new stream - // should be closed. Otherwise, there's no way to close this outside. - f.close(); - throw e; - } - this.blockIndexStores = listWithNulls(this.blocks.size()); - this.blockRowRanges = listWithNulls(this.blocks.size()); - for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { - paths.put(ColumnPath.get(col.getPath()), col); - } - - if (options.usePageChecksumVerification()) { - this.crc = new CRC32(); - this.crcAllocator = ReusingByteBufferAllocator.strict(options.getAllocator()); - } else { - this.crc = null; - this.crcAllocator = null; - } + this(file, readFooter(file, options, f, /*closeStreamOnFailure*/ true), options, f); } /** From 7e28a23ee1d6fa7927d8f177871e48b28cbf1c92 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 22 Aug 2025 13:08:37 +0800 Subject: [PATCH 6/6] Update parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java --- .../main/java/org/apache/parquet/hadoop/ParquetFileReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 4c148dd796..2ef39f7804 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -570,7 +570,7 @@ public static final ParquetMetadata readFooter(InputFile file, ParquetReadOption return readFooter(file, options, f, /*closeStreamOnFailure*/ false); } - public static final ParquetMetadata readFooter( + private static final ParquetMetadata readFooter( InputFile file, ParquetReadOptions options, SeekableInputStream f, boolean closeStreamOnFailure) throws IOException { ParquetMetadataConverter converter = new ParquetMetadataConverter(options);