Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ private class ParquetReader implements RecordReader<InternalRow> {
/** The current row's position in the file. */
private long currentRowPosition;

private long nextRowPosition;

/**
* For each request column, the reader to read this column. This is NULL if this column is
* missing from the file, in which case we populate the attribute with NULL.
Expand All @@ -280,20 +282,20 @@ private ParquetReader(
this.rowsReturned = 0;
this.totalCountLoadedSoFar = 0;
this.currentRowPosition = 0;
this.nextRowPosition = 0;
}

@Nullable
@Override
public RecordIterator<InternalRow> readBatch() throws IOException {
final ParquetReaderBatch batch = getCachedEntry();

long rowNumber = currentRowPosition;
if (!nextBatch(batch)) {
batch.recycle();
return null;
}

return batch.convertAndGetIterator(rowNumber);
return batch.convertAndGetIterator(currentRowPosition);
}

/** Advances to the next batch of rows. Returns false if there are no more. */
Expand All @@ -307,6 +309,8 @@ private boolean nextBatch(ParquetReaderBatch batch) throws IOException {
}
if (rowsReturned == totalCountLoadedSoFar) {
readNextRowGroup();
} else {
currentRowPosition = nextRowPosition;
}

int num = (int) Math.min(batchSize, totalCountLoadedSoFar - rowsReturned);
Expand All @@ -319,14 +323,14 @@ private boolean nextBatch(ParquetReaderBatch batch) throws IOException {
}
}
rowsReturned += num;
currentRowPosition += num;
nextRowPosition = currentRowPosition + num;
batch.columnarBatch.setNumRows(num);
return true;
}

private void readNextRowGroup() throws IOException {
PageReadStore pages = reader.readNextRowGroup();
if (pages == null) {
PageReadStore rowGroup = reader.readNextRowGroup();
if (rowGroup == null) {
throw new IOException(
"expecting more rows but reached last block. Read "
+ rowsReturned
Expand All @@ -343,11 +347,20 @@ private void readNextRowGroup() throws IOException {
projectedTypes[i],
types.get(i),
requestedSchema.getColumns(),
pages,
rowGroup,
0);
}
}
totalCountLoadedSoFar += pages.getRowCount();
totalCountLoadedSoFar += rowGroup.getRowCount();
if (rowGroup.getRowIndexOffset().isPresent()) {
currentRowPosition = rowGroup.getRowIndexOffset().get();
} else {
if (reader.rowGroupsFiltered()) {
throw new RuntimeException(
"There is a bug, rowIndexOffset must be present when row groups are filtered.");
}
currentRowPosition = nextRowPosition;
}
}

private ParquetReaderBatch getCachedEntry() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ private static ParquetMetadata readFooter(
private final List<BlockMetaData> blocks;
private final List<ColumnIndexStore> blockIndexStores;
private final List<RowRanges> blockRowRanges;
private final boolean blocksFiltered;

// not final. in some cases, this may be lazily loaded for backward-compat.
private ParquetMetadata footer;
Expand Down Expand Up @@ -247,6 +248,7 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOEx

try {
this.blocks = filterRowGroups(footer.getBlocks());
this.blocksFiltered = this.blocks.size() != footer.getBlocks().size();
} catch (Exception e) {
// In case that filterRowGroups throws an exception in the constructor, the new stream
// should be closed. Otherwise, there's no way to close this outside.
Expand All @@ -265,6 +267,18 @@ private static <T> List<T> listWithNulls(int size) {
return new ArrayList<>(Collections.nCopies(size, null));
}

private boolean checkRowIndexOffsetExists(List<BlockMetaData> blocks) {
for (BlockMetaData block : blocks) {
if (block.getRowIndexOffset() == -1) {
LOG.warn(
"Row index offset was not found in block metadata of file {}, skip applying filter in order to get the correct row position",
file.getPath());
return false;
}
}
return true;
}

public ParquetMetadata getFooter() {
if (footer == null) {
try {
Expand Down Expand Up @@ -319,7 +333,7 @@ public String getFile() {

private List<BlockMetaData> filterRowGroups(List<BlockMetaData> blocks) throws IOException {
FilterCompat.Filter recordFilter = options.getRecordFilter();
if (FilterCompat.isFilteringRequired(recordFilter)) {
if (checkRowIndexOffsetExists(blocks) && FilterCompat.isFilteringRequired(recordFilter)) {
// set up data filters based on configured levels
List<RowGroupFilter.FilterLevel> levels = new ArrayList<>();

Expand All @@ -340,6 +354,10 @@ private List<BlockMetaData> filterRowGroups(List<BlockMetaData> blocks) throws I
return blocks;
}

public boolean rowGroupsFiltered() {
return blocksFiltered;
}

public List<BlockMetaData> getRowGroups() {
return blocks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.DoubleType;
Expand All @@ -49,6 +51,7 @@
import org.apache.paimon.types.VarCharType;

import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.ParquetFilters;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -61,11 +64,13 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -333,6 +338,62 @@ void testReadRowPosition() throws IOException {
}
}

@RepeatedTest(10)
void testReadRowPositionWithRandomFilter() throws IOException {
int recordNumber = new Random().nextInt(10000) + 1;
int batchSize = new Random().nextInt(1000) + 1;
// make row group size = 1, then the row count in one row group will be
// `parquet.page.size.row.check.min`, which default value is 100
int rowGroupSize = 1;
int rowGroupCount = 100;
int randomStart = new Random().nextInt(10000) + 1;
List<InternalRow> records = new ArrayList<>(recordNumber);
for (int i = 0; i < recordNumber; i++) {
Integer v = i;
records.add(newRow(v));
}

Path testPath = createTempParquetFile(folder, records, rowGroupSize);

DataType[] fieldTypes = new DataType[] {new IntType()};
// Build filter: f4 > randomStart
PredicateBuilder builder =
new PredicateBuilder(
new RowType(
Collections.singletonList(new DataField(0, "f4", new IntType()))));
FilterCompat.Filter filter =
ParquetFilters.convert(
PredicateBuilder.splitAnd(builder.greaterThan(0, randomStart)));
ParquetReaderFactory format =
new ParquetReaderFactory(
new Options(),
RowType.builder().fields(fieldTypes, new String[] {"f4"}).build(),
batchSize,
filter);

AtomicBoolean isFirst = new AtomicBoolean(true);
try (RecordReader<InternalRow> reader =
format.createReader(
new FormatReaderContext(
new LocalFileIO(),
testPath,
new LocalFileIO().getFileSize(testPath)))) {
reader.forEachRemainingWithPosition(
(rowPosition, row) -> {
// check filter
// Note: the minimum unit of filter is row group
if (isFirst.get()) {
assertThat(randomStart - row.getInt(0)).isLessThan(rowGroupCount);
isFirst.set(false);
}
// check row position
// Note: in the written file, field f4's value is equaled to row position,
// so we can use it to check row position
assertThat(rowPosition).isEqualTo(row.getInt(0));
});
}
}

private void innerTestTypes(File folder, List<Integer> records, int rowGroupSize)
throws IOException {
List<InternalRow> rows = records.stream().map(this::newRow).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.lit
import org.junit.jupiter.api.Assertions

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -303,6 +304,48 @@ class DeletionVectorTest extends PaimonSparkTestBase {
}
}

test("Paimon deletionVector: select with format filter push down") {
val format = Random.shuffle(Seq("parquet", "orc", "avro")).head
val blockSize = Random.nextInt(10240) + 1
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING)
|TBLPROPERTIES (
| 'primary-key' = 'id',
| 'deletion-vectors.enabled' = 'true',
| 'file.format' = '$format',
| 'file.block-size' = '${blockSize}b',
| 'bucket' = '1'
|)
|""".stripMargin)

spark
.range(1, 10001)
.toDF("id")
.withColumn("name", lit("a"))
.createOrReplaceTempView("source_tbl")

sql("INSERT INTO T SELECT * FROM source_tbl")
// update to create dv
sql("INSERT INTO T VALUES (1111, 'a_new'), (2222, 'a_new')")

checkAnswer(
sql("SELECT count(*) FROM T"),
Seq(10000).toDF()
)
checkAnswer(
sql("SELECT count(*) FROM T WHERE (id > 1000 and id <= 2000) or (id > 3000 and id <= 4000)"),
Seq(2000).toDF()
)
checkAnswer(
spark.sql("SELECT name FROM T WHERE id = 1111"),
Seq("a_new").toDF()
)
checkAnswer(
spark.sql("SELECT name FROM T WHERE id = 2222"),
Seq("a_new").toDF()
)
}

private def getPathName(path: String): String = {
new Path(path).getName
}
Expand Down