From 4414f4da1287fbb1254d15244c1debe814246f47 Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Wed, 13 Mar 2024 11:55:56 +0800 Subject: [PATCH 1/4] cache the schema of data file for reader --- .../paimon/format/FormatReaderFactory.java | 7 ++++- .../io/KeyValueDataFileRecordReader.java | 7 +++-- .../paimon/io/KeyValueFileReaderFactory.java | 9 +++--- .../paimon/io/RowDataFileRecordReader.java | 4 ++- .../operation/AppendOnlyFileStoreRead.java | 1 + .../paimon/io/KeyValueFileReadWriteTest.java | 28 +++++++++++++++++++ .../paimon/format/avro/AvroBulkFormat.java | 4 +-- .../paimon/format/orc/OrcReaderFactory.java | 12 ++++++-- .../format/parquet/ParquetReaderFactory.java | 14 +++++----- .../parquet/ParquetTableStatsExtractor.java | 13 +++++++-- .../format/orc/OrcReaderFactoryTest.java | 6 ++-- .../ParquetTableStatsExtractorTest.java | 1 + 12 files changed, 81 insertions(+), 25 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java index b2b179159b8e..7d08fa1dd290 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java @@ -31,6 +31,11 @@ public interface FormatReaderFactory extends Serializable { RecordReader createReader(FileIO fileIO, Path file) throws IOException; - RecordReader createReader(FileIO fileIO, Path file, int poolSize) + RecordReader createReader(FileIO fileIO, Path file, Long fileSize) throws IOException; + + default RecordReader createReader( + FileIO fileIO, Path file, int poolSize, Long fileSize) throws IOException { + throw new UnsupportedOperationException(); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java index fe38ae146463..f7c6a5b48164 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java @@ -58,13 +58,14 @@ public KeyValueDataFileRecordReader( @Nullable Integer poolSize, @Nullable int[] indexMapping, @Nullable CastFieldGetter[] castMapping, - @Nullable PartitionInfo partitionInfo) + @Nullable PartitionInfo partitionInfo, + long fileSize) throws IOException { FileUtils.checkExists(fileIO, path); this.reader = poolSize == null - ? readerFactory.createReader(fileIO, path) - : readerFactory.createReader(fileIO, path, poolSize); + ? readerFactory.createReader(fileIO, path, fileSize) + : readerFactory.createReader(fileIO, path, poolSize, fileSize); this.serializer = new KeyValueSerializer(keyType, valueType); this.level = level; this.indexMapping = indexMapping; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 63fef31fc142..849aa95aac9a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -92,9 +92,9 @@ public RecordReader createRecordReader( long schemaId, String fileName, long fileSize, int level) throws IOException { if (fileSize >= asyncThreshold && fileName.endsWith("orc")) { return new AsyncRecordReader<>( - () -> createRecordReader(schemaId, fileName, level, false, 2)); + () -> createRecordReader(schemaId, fileName, level, false, 2, fileSize)); } - return createRecordReader(schemaId, fileName, level, true, null); + return createRecordReader(schemaId, fileName, level, true, null, fileSize); } private RecordReader createRecordReader( @@ -102,7 +102,8 @@ private RecordReader createRecordReader( String fileName, int level, boolean reuseFormat, - @Nullable Integer poolSize) + @Nullable Integer poolSize, + long fileSize) throws IOException { String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName); @@ -130,7 +131,7 @@ private RecordReader createRecordReader( poolSize, bulkFormatMapping.getIndexMapping(), bulkFormatMapping.getCastMapping(), - PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition)); + PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition),fileSize); Optional deletionVector = dvFactory.create(fileName); if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) { recordReader = new ApplyDeletionVectorReader<>(recordReader, deletionVector.get()); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java index ee8f9c26f8b0..4bab8315b42b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java @@ -46,12 +46,14 @@ public class RowDataFileRecordReader implements RecordReader { public RowDataFileRecordReader( FileIO fileIO, Path path, + long fileSize, FormatReaderFactory readerFactory, @Nullable int[] indexMapping, @Nullable CastFieldGetter[] castMapping, @Nullable PartitionInfo partitionInfo) throws IOException { - this.reader = FileUtils.createFormatReader(fileIO, readerFactory, path); + FileUtils.checkExists(fileIO, path); + this.reader = readerFactory.createReader(fileIO, path, fileSize); this.indexMapping = indexMapping; this.partitionInfo = partitionInfo; this.castMapping = castMapping; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java index c06cce45899a..1e12b9c74684 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java @@ -174,6 +174,7 @@ public RecordReader createReader(DataSplit split) throws IOExceptio new RowDataFileRecordReader( fileIO, dataFilePathFactory.toPath(file.fileName()), + file.fileSize(), bulkFormatMapping.getReaderFactory(), bulkFormatMapping.getIndexMapping(), bulkFormatMapping.getCastMapping(), diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index 05f260097a4d..509677174306 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -48,6 +48,8 @@ import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.HashMap; @@ -385,4 +387,30 @@ private void checkRollingFiles( assertThat(meta.level()).isEqualTo(expected.level()); } } + + @ParameterizedTest + @ValueSource(strings = {"parquet", "orc", "avro"}) + public void testReaderUseFileSizeFromMetadata(String format) throws Exception { + testWriteAndReadDataFileImpl(format); + DataFileTestDataGenerator.Data data = gen.next(); + KeyValueFileWriterFactory writerFactory = createWriterFactory(tempDir.toString(), format); + DataFileMetaSerializer serializer = new DataFileMetaSerializer(); + + RollingFileWriter writer = + writerFactory.createRollingMergeTreeFileWriter(0); + writer.write(CloseableIterator.fromList(data.content, kv -> {})); + writer.close(); + List actualMetas = writer.result(); + + KeyValueFileReaderFactory readerFactory = + createReaderFactory(tempDir.toString(), format, null, null); + assertData( + data, + actualMetas, + TestKeyValueGenerator.KEY_SERIALIZER, + TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER, + serializer, + readerFactory, + kv -> kv); + } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java index 1c1cfddf1fd2..d5450e59e4f9 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java @@ -54,9 +54,9 @@ public RecordReader createReader(FileIO fileIO, Path file) throws I } @Override - public RecordReader createReader(FileIO fileIO, Path file, int poolSize) + public RecordReader createReader(FileIO fileIO, Path file, Long fileSize) throws IOException { - throw new UnsupportedOperationException(); + return createReader(fileIO, file); } private class AvroReader implements RecordReader { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 696665777243..461a15a72042 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -89,11 +89,17 @@ public OrcReaderFactory( @Override public OrcVectorizedReader createReader(FileIO fileIO, Path file) throws IOException { - return createReader(fileIO, file, 1); + return createReader(fileIO, file, 1, null); } @Override - public OrcVectorizedReader createReader(FileIO fileIO, Path file, int poolSize) + public org.apache.paimon.reader.RecordReader createReader( + FileIO fileIO, Path file, Long fileSize) throws IOException { + return createReader(fileIO, file, 1, fileSize); + } + + @Override + public OrcVectorizedReader createReader(FileIO fileIO, Path file, int poolSize, Long fileSize) throws IOException { Pool poolOfBatches = createPoolOfBatches(poolSize); RecordReader orcReader = @@ -104,7 +110,7 @@ public OrcVectorizedReader createReader(FileIO fileIO, Path file, int poolSize) fileIO, file, 0, - fileIO.getFileSize(file)); + fileSize == null ? fileIO.getFileSize(file) : fileSize); return new OrcVectorizedReader(orcReader, poolOfBatches); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 2c2985d32ee1..ba9c72649ec3 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -88,8 +88,14 @@ public ParquetReaderFactory(Options conf, RowType projectedType, int batchSize) @Override public ParquetReader createReader(FileIO fileIO, Path filePath) throws IOException { + return createReader(fileIO, filePath, null); + } + + @Override + public ParquetReader createReader(FileIO fileIO, Path filePath, Long fileSize) + throws IOException { final long splitOffset = 0; - final long splitLength = fileIO.getFileSize(filePath); + final long splitLength = fileSize == null ? fileIO.getFileSize(filePath) : fileSize; ParquetReadOptions.Builder builder = ParquetReadOptions.builder().withRange(splitOffset, splitOffset + splitLength); @@ -108,12 +114,6 @@ public ParquetReader createReader(FileIO fileIO, Path filePath) throws IOExcepti return new ParquetReader(reader, requestedSchema, reader.getRecordCount(), poolOfBatches); } - @Override - public RecordReader createReader(FileIO fileIO, Path file, int poolSize) - throws IOException { - throw new UnsupportedOperationException(); - } - private void setReadOptions(ParquetReadOptions.Builder builder) { builder.useSignedStringMinMax( conf.getBoolean("parquet.strings.signed-min-max.enabled", false)); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java index 9055f64b0241..467d09b3c3b3 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java @@ -84,7 +84,7 @@ public Pair extractWithFileInfo(FileIO fileIO, Path path DataField field = rowType.getFields().get(i); return toFieldStats( field, - statsPair.getLeft().get(field.name()), + statsPair.getLeft(), collectors[i]); }) .toArray(FieldStats[]::new), @@ -92,7 +92,15 @@ public Pair extractWithFileInfo(FileIO fileIO, Path path } private FieldStats toFieldStats( - DataField field, Statistics stats, FieldStatsCollector collector) { + DataField field, Map> fieldPathAndStats, FieldStatsCollector collector) { + Statistics stats; + switch (field.type().getTypeRoot()) { + case ARRAY: + stats = fieldPathAndStats.get(field.name() + ".list.element"); + break; + default: + stats = fieldPathAndStats.get(field.name()); + } if (stats == null) { return new FieldStats(null, null, null); } @@ -177,6 +185,7 @@ private FieldStats toFieldStats( toTimestampStats( stats, ((LocalZonedTimestampType) field.type()).getPrecision()); break; + case ARRAY: default: fieldStats = new FieldStats(null, null, nullCount); } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java index a5160f5c7990..c1bcaf906a42 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java @@ -179,8 +179,9 @@ void testReadRowPositionWithRandomFilterAndPool() throws IOException { AtomicBoolean isFirst = new AtomicBoolean(true); + LocalFileIO localFileIO = new LocalFileIO(); try (RecordReader reader = - format.createReader(new LocalFileIO(), flatFile, randomPooSize)) { + format.createReader(localFileIO, flatFile, randomPooSize,localFileIO.getFileSize(flatFile))) { reader.forEachRemainingWithPosition( (rowPosition, row) -> { // check filter: _col0 > randomStart @@ -202,8 +203,9 @@ void testReadRowPositionWithTransformAndFilter() throws IOException { int randomPooSize = new Random().nextInt(3) + 1; OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1}); + LocalFileIO localFileIO = new LocalFileIO(); try (RecordReader reader = - format.createReader(new LocalFileIO(), flatFile, randomPooSize)) { + format.createReader(localFileIO, flatFile, randomPooSize,localFileIO.getFileSize(flatFile))) { reader.transform(row -> row) .filter(row -> row.getInt(1) % 123 == 0) .forEachRemainingWithPosition( diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractorTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractorTest.java index 784833eecdc4..9ac482586996 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractorTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractorTest.java @@ -88,6 +88,7 @@ protected FieldStats regenerate(FieldStats stats, DataType type) { } break; case ARRAY: + return new FieldStats(null, null, stats.nullCount()); case MAP: case MULTISET: case ROW: From a7c878efb51dcce2537485ef0a8a76846914cfca Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Fri, 15 Mar 2024 10:40:06 +0800 Subject: [PATCH 2/4] support null column count stat for the array datatype of parquet. --- .../format/parquet/ParquetTableStatsExtractor.java | 13 ++----------- .../parquet/ParquetTableStatsExtractorTest.java | 1 - 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java index 467d09b3c3b3..9055f64b0241 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java @@ -84,7 +84,7 @@ public Pair extractWithFileInfo(FileIO fileIO, Path path DataField field = rowType.getFields().get(i); return toFieldStats( field, - statsPair.getLeft(), + statsPair.getLeft().get(field.name()), collectors[i]); }) .toArray(FieldStats[]::new), @@ -92,15 +92,7 @@ public Pair extractWithFileInfo(FileIO fileIO, Path path } private FieldStats toFieldStats( - DataField field, Map> fieldPathAndStats, FieldStatsCollector collector) { - Statistics stats; - switch (field.type().getTypeRoot()) { - case ARRAY: - stats = fieldPathAndStats.get(field.name() + ".list.element"); - break; - default: - stats = fieldPathAndStats.get(field.name()); - } + DataField field, Statistics stats, FieldStatsCollector collector) { if (stats == null) { return new FieldStats(null, null, null); } @@ -185,7 +177,6 @@ private FieldStats toFieldStats( toTimestampStats( stats, ((LocalZonedTimestampType) field.type()).getPrecision()); break; - case ARRAY: default: fieldStats = new FieldStats(null, null, nullCount); } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractorTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractorTest.java index 9ac482586996..784833eecdc4 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractorTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractorTest.java @@ -88,7 +88,6 @@ protected FieldStats regenerate(FieldStats stats, DataType type) { } break; case ARRAY: - return new FieldStats(null, null, stats.nullCount()); case MAP: case MULTISET: case ROW: From a716834316b9f011d59445daa5477a6a176d23b5 Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Tue, 19 Mar 2024 10:43:55 +0800 Subject: [PATCH 3/4] add context --- .../paimon/format/FormatReaderContext.java | 54 +++++++++++++++++++ .../paimon/format/FormatReaderFactory.java | 12 ++--- .../io/KeyValueDataFileRecordReader.java | 6 +-- .../paimon/io/KeyValueFileReaderFactory.java | 3 +- .../paimon/io/RowDataFileRecordReader.java | 4 +- .../paimon/format/avro/AvroBulkFormat.java | 10 ++-- .../paimon/format/orc/OrcReaderFactory.java | 20 +++---- .../format/parquet/ParquetReaderFactory.java | 12 ++--- .../format/orc/OrcReaderFactoryTest.java | 7 ++- 9 files changed, 86 insertions(+), 42 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java new file mode 100644 index 000000000000..b1ad3fa47e1a --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.reader.RecordReader; + +/** the context for creating RecordReader {@link RecordReader}. */ +public class FormatReaderContext { + private final FileIO fileIO; + private final Path file; + private final Integer poolSize; + private final Long fileSize; + + public FormatReaderContext(FileIO fileIO, Path file, Integer poolSize, Long fileSize) { + this.fileIO = fileIO; + this.file = file; + this.poolSize = poolSize; + this.fileSize = fileSize; + } + + public FileIO getFileIO() { + return fileIO; + } + + public Path getFile() { + return file; + } + + public Integer getPoolSize() { + return poolSize; + } + + public Long getFileSize() { + return fileSize; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java index 7d08fa1dd290..f524ff4a1465 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java @@ -29,13 +29,9 @@ /** A factory to create {@link RecordReader} for file. */ public interface FormatReaderFactory extends Serializable { - RecordReader createReader(FileIO fileIO, Path file) throws IOException; - - RecordReader createReader(FileIO fileIO, Path file, Long fileSize) - throws IOException; - - default RecordReader createReader( - FileIO fileIO, Path file, int poolSize, Long fileSize) throws IOException { - throw new UnsupportedOperationException(); + default RecordReader createReader(FileIO fileIO, Path file) throws IOException { + return createReader(new FormatReaderContext(fileIO, file, null, null)); } + + RecordReader createReader(FormatReaderContext context) throws IOException; } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java index f7c6a5b48164..4e7dfec9e55f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.PartitionInfo; import org.apache.paimon.data.columnar.ColumnarRowIterator; +import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -63,9 +64,8 @@ public KeyValueDataFileRecordReader( throws IOException { FileUtils.checkExists(fileIO, path); this.reader = - poolSize == null - ? readerFactory.createReader(fileIO, path, fileSize) - : readerFactory.createReader(fileIO, path, poolSize, fileSize); + readerFactory.createReader( + new FormatReaderContext(fileIO, path, poolSize, fileSize)); this.serializer = new KeyValueSerializer(keyType, valueType); this.level = level; this.indexMapping = indexMapping; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 849aa95aac9a..73b312043c75 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -131,7 +131,8 @@ private RecordReader createRecordReader( poolSize, bulkFormatMapping.getIndexMapping(), bulkFormatMapping.getCastMapping(), - PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition),fileSize); + PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition), + fileSize); Optional deletionVector = dvFactory.create(fileName); if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) { recordReader = new ApplyDeletionVectorReader<>(recordReader, deletionVector.get()); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java index 4bab8315b42b..b461ebf0b6b9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.PartitionInfo; import org.apache.paimon.data.columnar.ColumnarRowIterator; +import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -53,7 +54,8 @@ public RowDataFileRecordReader( @Nullable PartitionInfo partitionInfo) throws IOException { FileUtils.checkExists(fileIO, path); - this.reader = readerFactory.createReader(fileIO, path, fileSize); + FormatReaderContext context = new FormatReaderContext(fileIO, path, null, fileSize); + this.reader = readerFactory.createReader(context); this.indexMapping = indexMapping; this.partitionInfo = partitionInfo; this.castMapping = castMapping; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java index d5450e59e4f9..10fc65ec88de 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.avro; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -49,14 +50,9 @@ public AvroBulkFormat(RowType projectedRowType) { } @Override - public RecordReader createReader(FileIO fileIO, Path file) throws IOException { - return new AvroReader(fileIO, file); - } - - @Override - public RecordReader createReader(FileIO fileIO, Path file, Long fileSize) + public RecordReader createReader(FormatReaderContext formatReaderContext) throws IOException { - return createReader(fileIO, file); + return new AvroReader(formatReaderContext.getFileIO(), formatReaderContext.getFile()); } private class AvroReader implements RecordReader { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 461a15a72042..cdc46139f790 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.columnar.ColumnarRow; import org.apache.paimon.data.columnar.ColumnarRowIterator; import org.apache.paimon.data.columnar.VectorizedColumnBatch; +import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem; import org.apache.paimon.format.orc.filter.OrcFilters; @@ -88,20 +89,13 @@ public OrcReaderFactory( // ------------------------------------------------------------------------ @Override - public OrcVectorizedReader createReader(FileIO fileIO, Path file) throws IOException { - return createReader(fileIO, file, 1, null); - } - - @Override - public org.apache.paimon.reader.RecordReader createReader( - FileIO fileIO, Path file, Long fileSize) throws IOException { - return createReader(fileIO, file, 1, fileSize); - } - - @Override - public OrcVectorizedReader createReader(FileIO fileIO, Path file, int poolSize, Long fileSize) - throws IOException { + public OrcVectorizedReader createReader(FormatReaderContext context) throws IOException { + int poolSize = context.getPoolSize() == null ? 1 : context.getPoolSize(); Pool poolOfBatches = createPoolOfBatches(poolSize); + + FileIO fileIO = context.getFileIO(); + Long fileSize = context.getFileSize(); + Path file = context.getFile(); RecordReader orcReader = createRecordReader( hadoopConfigWrapper.getHadoopConfig(), diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index ba9c72649ec3..29cf45a65260 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.columnar.ColumnarRowIterator; import org.apache.paimon.data.columnar.VectorizedColumnBatch; import org.apache.paimon.data.columnar.writable.WritableColumnVector; +import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.parquet.reader.ColumnReader; import org.apache.paimon.format.parquet.reader.ParquetDecimalVector; @@ -87,13 +88,10 @@ public ParquetReaderFactory(Options conf, RowType projectedType, int batchSize) } @Override - public ParquetReader createReader(FileIO fileIO, Path filePath) throws IOException { - return createReader(fileIO, filePath, null); - } - - @Override - public ParquetReader createReader(FileIO fileIO, Path filePath, Long fileSize) - throws IOException { + public ParquetReader createReader(FormatReaderContext context) throws IOException { + Path filePath = context.getFile(); + FileIO fileIO = context.getFileIO(); + Long fileSize = context.getFileSize(); final long splitOffset = 0; final long splitLength = fileSize == null ? fileIO.getFileSize(filePath) : fileSize; diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java index c1bcaf906a42..5a0f4925ddbc 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.orc; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.orc.filter.OrcFilters; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; @@ -181,7 +182,8 @@ void testReadRowPositionWithRandomFilterAndPool() throws IOException { LocalFileIO localFileIO = new LocalFileIO(); try (RecordReader reader = - format.createReader(localFileIO, flatFile, randomPooSize,localFileIO.getFileSize(flatFile))) { + format.createReader( + new FormatReaderContext(localFileIO, flatFile, randomPooSize, null))) { reader.forEachRemainingWithPosition( (rowPosition, row) -> { // check filter: _col0 > randomStart @@ -205,7 +207,8 @@ void testReadRowPositionWithTransformAndFilter() throws IOException { LocalFileIO localFileIO = new LocalFileIO(); try (RecordReader reader = - format.createReader(localFileIO, flatFile, randomPooSize,localFileIO.getFileSize(flatFile))) { + format.createReader( + new FormatReaderContext(localFileIO, flatFile, randomPooSize, null))) { reader.transform(row -> row) .filter(row -> row.getInt(1) % 123 == 0) .forEachRemainingWithPosition( From 4980295e82fcc992a9641c4c1253c48e8f0f7187 Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Tue, 19 Mar 2024 11:23:23 +0800 Subject: [PATCH 4/4] fix test --- .../java/org/apache/paimon/io/KeyValueFileReadWriteTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index 509677174306..aae61336f90d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -391,7 +391,6 @@ private void checkRollingFiles( @ParameterizedTest @ValueSource(strings = {"parquet", "orc", "avro"}) public void testReaderUseFileSizeFromMetadata(String format) throws Exception { - testWriteAndReadDataFileImpl(format); DataFileTestDataGenerator.Data data = gen.next(); KeyValueFileWriterFactory writerFactory = createWriterFactory(tempDir.toString(), format); DataFileMetaSerializer serializer = new DataFileMetaSerializer();