Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;

/** the context for creating RecordReader {@link RecordReader}. */
public class FormatReaderContext {
private final FileIO fileIO;
private final Path file;
private final Integer poolSize;
private final Long fileSize;

public FormatReaderContext(FileIO fileIO, Path file, Integer poolSize, Long fileSize) {
this.fileIO = fileIO;
this.file = file;
this.poolSize = poolSize;
this.fileSize = fileSize;
}

public FileIO getFileIO() {
return fileIO;
}

public Path getFile() {
return file;
}

public Integer getPoolSize() {
return poolSize;
}

public Long getFileSize() {
return fileSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
/** A factory to create {@link RecordReader} for file. */
public interface FormatReaderFactory extends Serializable {

RecordReader<InternalRow> createReader(FileIO fileIO, Path file) throws IOException;
default RecordReader<InternalRow> createReader(FileIO fileIO, Path file) throws IOException {
return createReader(new FormatReaderContext(fileIO, file, null, null));
}

RecordReader<InternalRow> createReader(FileIO fileIO, Path file, int poolSize)
throws IOException;
RecordReader<InternalRow> createReader(FormatReaderContext context) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -58,13 +59,13 @@ public KeyValueDataFileRecordReader(
@Nullable Integer poolSize,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable PartitionInfo partitionInfo)
@Nullable PartitionInfo partitionInfo,
long fileSize)
throws IOException {
FileUtils.checkExists(fileIO, path);
this.reader =
poolSize == null
? readerFactory.createReader(fileIO, path)
: readerFactory.createReader(fileIO, path, poolSize);
readerFactory.createReader(
new FormatReaderContext(fileIO, path, poolSize, fileSize));
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
this.indexMapping = indexMapping;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,18 @@ public RecordReader<KeyValue> createRecordReader(
long schemaId, String fileName, long fileSize, int level) throws IOException {
if (fileSize >= asyncThreshold && fileName.endsWith("orc")) {
return new AsyncRecordReader<>(
() -> createRecordReader(schemaId, fileName, level, false, 2));
() -> createRecordReader(schemaId, fileName, level, false, 2, fileSize));
}
return createRecordReader(schemaId, fileName, level, true, null);
return createRecordReader(schemaId, fileName, level, true, null, fileSize);
}

private RecordReader<KeyValue> createRecordReader(
long schemaId,
String fileName,
int level,
boolean reuseFormat,
@Nullable Integer poolSize)
@Nullable Integer poolSize,
long fileSize)
throws IOException {
String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName);

Expand Down Expand Up @@ -130,7 +131,8 @@ private RecordReader<KeyValue> createRecordReader(
poolSize,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition),
fileSize);
Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
recordReader = new ApplyDeletionVectorReader<>(recordReader, deletionVector.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
Expand All @@ -46,12 +47,15 @@ public class RowDataFileRecordReader implements RecordReader<InternalRow> {
public RowDataFileRecordReader(
FileIO fileIO,
Path path,
long fileSize,
FormatReaderFactory readerFactory,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable PartitionInfo partitionInfo)
throws IOException {
this.reader = FileUtils.createFormatReader(fileIO, readerFactory, path);
FileUtils.checkExists(fileIO, path);
FormatReaderContext context = new FormatReaderContext(fileIO, path, null, fileSize);
this.reader = readerFactory.createReader(context);
this.indexMapping = indexMapping;
this.partitionInfo = partitionInfo;
this.castMapping = castMapping;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public RecordReader<InternalRow> createReader(DataSplit split) throws IOExceptio
new RowDataFileRecordReader(
fileIO,
dataFilePathFactory.toPath(file.fileName()),
file.fileSize(),
bulkFormatMapping.getReaderFactory(),
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -385,4 +387,29 @@ private void checkRollingFiles(
assertThat(meta.level()).isEqualTo(expected.level());
}
}

@ParameterizedTest
@ValueSource(strings = {"parquet", "orc", "avro"})
public void testReaderUseFileSizeFromMetadata(String format) throws Exception {
DataFileTestDataGenerator.Data data = gen.next();
KeyValueFileWriterFactory writerFactory = createWriterFactory(tempDir.toString(), format);
DataFileMetaSerializer serializer = new DataFileMetaSerializer();

RollingFileWriter<KeyValue, DataFileMeta> writer =
writerFactory.createRollingMergeTreeFileWriter(0);
writer.write(CloseableIterator.fromList(data.content, kv -> {}));
writer.close();
List<DataFileMeta> actualMetas = writer.result();

KeyValueFileReaderFactory readerFactory =
createReaderFactory(tempDir.toString(), format, null, null);
assertData(
data,
actualMetas,
TestKeyValueGenerator.KEY_SERIALIZER,
TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER,
serializer,
readerFactory,
kv -> kv);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.format.avro;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -49,14 +50,9 @@ public AvroBulkFormat(RowType projectedRowType) {
}

@Override
public RecordReader<InternalRow> createReader(FileIO fileIO, Path file) throws IOException {
return new AvroReader(fileIO, file);
}

@Override
public RecordReader<InternalRow> createReader(FileIO fileIO, Path file, int poolSize)
public RecordReader<InternalRow> createReader(FormatReaderContext formatReaderContext)
throws IOException {
throw new UnsupportedOperationException();
return new AvroReader(formatReaderContext.getFileIO(), formatReaderContext.getFile());
}

private class AvroReader implements RecordReader<InternalRow> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem;
import org.apache.paimon.format.orc.filter.OrcFilters;
Expand Down Expand Up @@ -88,14 +89,13 @@ public OrcReaderFactory(
// ------------------------------------------------------------------------

@Override
public OrcVectorizedReader createReader(FileIO fileIO, Path file) throws IOException {
return createReader(fileIO, file, 1);
}

@Override
public OrcVectorizedReader createReader(FileIO fileIO, Path file, int poolSize)
throws IOException {
public OrcVectorizedReader createReader(FormatReaderContext context) throws IOException {
int poolSize = context.getPoolSize() == null ? 1 : context.getPoolSize();
Pool<OrcReaderBatch> poolOfBatches = createPoolOfBatches(poolSize);

FileIO fileIO = context.getFileIO();
Long fileSize = context.getFileSize();
Path file = context.getFile();
RecordReader orcReader =
createRecordReader(
hadoopConfigWrapper.getHadoopConfig(),
Expand All @@ -104,7 +104,7 @@ public OrcVectorizedReader createReader(FileIO fileIO, Path file, int poolSize)
fileIO,
file,
0,
fileIO.getFileSize(file));
fileSize == null ? fileIO.getFileSize(file) : fileSize);
return new OrcVectorizedReader(orcReader, poolOfBatches);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.parquet.reader.ColumnReader;
import org.apache.paimon.format.parquet.reader.ParquetDecimalVector;
Expand Down Expand Up @@ -87,9 +88,12 @@ public ParquetReaderFactory(Options conf, RowType projectedType, int batchSize)
}

@Override
public ParquetReader createReader(FileIO fileIO, Path filePath) throws IOException {
public ParquetReader createReader(FormatReaderContext context) throws IOException {
Path filePath = context.getFile();
FileIO fileIO = context.getFileIO();
Long fileSize = context.getFileSize();
final long splitOffset = 0;
final long splitLength = fileIO.getFileSize(filePath);
final long splitLength = fileSize == null ? fileIO.getFileSize(filePath) : fileSize;

ParquetReadOptions.Builder builder =
ParquetReadOptions.builder().withRange(splitOffset, splitOffset + splitLength);
Expand All @@ -108,12 +112,6 @@ public ParquetReader createReader(FileIO fileIO, Path filePath) throws IOExcepti
return new ParquetReader(reader, requestedSchema, reader.getRecordCount(), poolOfBatches);
}

@Override
public RecordReader<InternalRow> createReader(FileIO fileIO, Path file, int poolSize)
throws IOException {
throw new UnsupportedOperationException();
}

private void setReadOptions(ParquetReadOptions.Builder builder) {
builder.useSignedStringMinMax(
conf.getBoolean("parquet.strings.signed-min-max.enabled", false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.format.orc;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.orc.filter.OrcFilters;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
Expand Down Expand Up @@ -179,8 +180,10 @@ void testReadRowPositionWithRandomFilterAndPool() throws IOException {

AtomicBoolean isFirst = new AtomicBoolean(true);

LocalFileIO localFileIO = new LocalFileIO();
try (RecordReader<InternalRow> reader =
format.createReader(new LocalFileIO(), flatFile, randomPooSize)) {
format.createReader(
new FormatReaderContext(localFileIO, flatFile, randomPooSize, null))) {
reader.forEachRemainingWithPosition(
(rowPosition, row) -> {
// check filter: _col0 > randomStart
Expand All @@ -202,8 +205,10 @@ void testReadRowPositionWithTransformAndFilter() throws IOException {
int randomPooSize = new Random().nextInt(3) + 1;
OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1});

LocalFileIO localFileIO = new LocalFileIO();
try (RecordReader<InternalRow> reader =
format.createReader(new LocalFileIO(), flatFile, randomPooSize)) {
format.createReader(
new FormatReaderContext(localFileIO, flatFile, randomPooSize, null))) {
reader.transform(row -> row)
.filter(row -> row.getInt(1) % 123 == 0)
.forEachRemainingWithPosition(
Expand Down