From bb59905df28063057f14a8641201f2ee4c0e990b Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Sat, 29 Jun 2024 09:45:42 +0800 Subject: [PATCH] first version --- .../format/parquet/ParquetReaderFactory.java | 27 +++++--- .../parquet/hadoop/ParquetFileReader.java | 20 +++++- .../format/parquet/ParquetReadWriteTest.java | 61 +++++++++++++++++++ .../paimon/spark/sql/DeletionVectorTest.scala | 43 +++++++++++++ 4 files changed, 143 insertions(+), 8 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 2b2c651a8538..afaf8a501423 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -261,6 +261,8 @@ private class ParquetReader implements RecordReader { /** The current row's position in the file. */ private long currentRowPosition; + private long nextRowPosition; + /** * For each request column, the reader to read this column. This is NULL if this column is * missing from the file, in which case we populate the attribute with NULL. @@ -280,6 +282,7 @@ private ParquetReader( this.rowsReturned = 0; this.totalCountLoadedSoFar = 0; this.currentRowPosition = 0; + this.nextRowPosition = 0; } @Nullable @@ -287,13 +290,12 @@ private ParquetReader( public RecordIterator readBatch() throws IOException { final ParquetReaderBatch batch = getCachedEntry(); - long rowNumber = currentRowPosition; if (!nextBatch(batch)) { batch.recycle(); return null; } - return batch.convertAndGetIterator(rowNumber); + return batch.convertAndGetIterator(currentRowPosition); } /** Advances to the next batch of rows. Returns false if there are no more. */ @@ -307,6 +309,8 @@ private boolean nextBatch(ParquetReaderBatch batch) throws IOException { } if (rowsReturned == totalCountLoadedSoFar) { readNextRowGroup(); + } else { + currentRowPosition = nextRowPosition; } int num = (int) Math.min(batchSize, totalCountLoadedSoFar - rowsReturned); @@ -319,14 +323,14 @@ private boolean nextBatch(ParquetReaderBatch batch) throws IOException { } } rowsReturned += num; - currentRowPosition += num; + nextRowPosition = currentRowPosition + num; batch.columnarBatch.setNumRows(num); return true; } private void readNextRowGroup() throws IOException { - PageReadStore pages = reader.readNextRowGroup(); - if (pages == null) { + PageReadStore rowGroup = reader.readNextRowGroup(); + if (rowGroup == null) { throw new IOException( "expecting more rows but reached last block. Read " + rowsReturned @@ -343,11 +347,20 @@ private void readNextRowGroup() throws IOException { projectedTypes[i], types.get(i), requestedSchema.getColumns(), - pages, + rowGroup, 0); } } - totalCountLoadedSoFar += pages.getRowCount(); + totalCountLoadedSoFar += rowGroup.getRowCount(); + if (rowGroup.getRowIndexOffset().isPresent()) { + currentRowPosition = rowGroup.getRowIndexOffset().get(); + } else { + if (reader.rowGroupsFiltered()) { + throw new RuntimeException( + "There is a bug, rowIndexOffset must be present when row groups are filtered."); + } + currentRowPosition = nextRowPosition; + } } private ParquetReaderBatch getCachedEntry() throws IOException { diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 118ba008710d..aca1f021b98f 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -215,6 +215,7 @@ private static ParquetMetadata readFooter( private final List blocks; private final List blockIndexStores; private final List blockRowRanges; + private final boolean blocksFiltered; // not final. in some cases, this may be lazily loaded for backward-compat. private ParquetMetadata footer; @@ -247,6 +248,7 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOEx try { this.blocks = filterRowGroups(footer.getBlocks()); + this.blocksFiltered = this.blocks.size() != footer.getBlocks().size(); } catch (Exception e) { // In case that filterRowGroups throws an exception in the constructor, the new stream // should be closed. Otherwise, there's no way to close this outside. @@ -265,6 +267,18 @@ private static List listWithNulls(int size) { return new ArrayList<>(Collections.nCopies(size, null)); } + private boolean checkRowIndexOffsetExists(List blocks) { + for (BlockMetaData block : blocks) { + if (block.getRowIndexOffset() == -1) { + LOG.warn( + "Row index offset was not found in block metadata of file {}, skip applying filter in order to get the correct row position", + file.getPath()); + return false; + } + } + return true; + } + public ParquetMetadata getFooter() { if (footer == null) { try { @@ -319,7 +333,7 @@ public String getFile() { private List filterRowGroups(List blocks) throws IOException { FilterCompat.Filter recordFilter = options.getRecordFilter(); - if (FilterCompat.isFilteringRequired(recordFilter)) { + if (checkRowIndexOffsetExists(blocks) && FilterCompat.isFilteringRequired(recordFilter)) { // set up data filters based on configured levels List levels = new ArrayList<>(); @@ -340,6 +354,10 @@ private List filterRowGroups(List blocks) throws I return blocks; } + public boolean rowGroupsFiltered() { + return blocksFiltered; + } + public List getRowGroups() { return blocks; } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java index 4457db5e81f3..00a7a0081045 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java @@ -31,10 +31,12 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.DoubleType; @@ -49,6 +51,7 @@ import org.apache.paimon.types.VarCharType; import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.ParquetFilters; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -61,11 +64,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -333,6 +338,62 @@ void testReadRowPosition() throws IOException { } } + @RepeatedTest(10) + void testReadRowPositionWithRandomFilter() throws IOException { + int recordNumber = new Random().nextInt(10000) + 1; + int batchSize = new Random().nextInt(1000) + 1; + // make row group size = 1, then the row count in one row group will be + // `parquet.page.size.row.check.min`, which default value is 100 + int rowGroupSize = 1; + int rowGroupCount = 100; + int randomStart = new Random().nextInt(10000) + 1; + List records = new ArrayList<>(recordNumber); + for (int i = 0; i < recordNumber; i++) { + Integer v = i; + records.add(newRow(v)); + } + + Path testPath = createTempParquetFile(folder, records, rowGroupSize); + + DataType[] fieldTypes = new DataType[] {new IntType()}; + // Build filter: f4 > randomStart + PredicateBuilder builder = + new PredicateBuilder( + new RowType( + Collections.singletonList(new DataField(0, "f4", new IntType())))); + FilterCompat.Filter filter = + ParquetFilters.convert( + PredicateBuilder.splitAnd(builder.greaterThan(0, randomStart))); + ParquetReaderFactory format = + new ParquetReaderFactory( + new Options(), + RowType.builder().fields(fieldTypes, new String[] {"f4"}).build(), + batchSize, + filter); + + AtomicBoolean isFirst = new AtomicBoolean(true); + try (RecordReader reader = + format.createReader( + new FormatReaderContext( + new LocalFileIO(), + testPath, + new LocalFileIO().getFileSize(testPath)))) { + reader.forEachRemainingWithPosition( + (rowPosition, row) -> { + // check filter + // Note: the minimum unit of filter is row group + if (isFirst.get()) { + assertThat(randomStart - row.getInt(0)).isLessThan(rowGroupCount); + isFirst.set(false); + } + // check row position + // Note: in the written file, field f4's value is equaled to row position, + // so we can use it to check row position + assertThat(rowPosition).isEqualTo(row.getInt(0)); + }); + } + } + private void innerTestTypes(File folder, List records, int rowGroupSize) throws IOException { List rows = records.stream().map(this::newRow).collect(Collectors.toList()); diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala index 02d898f97dd0..bc4ebfde3f5f 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala @@ -25,6 +25,7 @@ import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.paimon.table.FileStoreTable import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.lit import org.junit.jupiter.api.Assertions import scala.collection.JavaConverters._ @@ -303,6 +304,48 @@ class DeletionVectorTest extends PaimonSparkTestBase { } } + test("Paimon deletionVector: select with format filter push down") { + val format = Random.shuffle(Seq("parquet", "orc", "avro")).head + val blockSize = Random.nextInt(10240) + 1 + spark.sql(s""" + |CREATE TABLE T (id INT, name STRING) + |TBLPROPERTIES ( + | 'primary-key' = 'id', + | 'deletion-vectors.enabled' = 'true', + | 'file.format' = '$format', + | 'file.block-size' = '${blockSize}b', + | 'bucket' = '1' + |) + |""".stripMargin) + + spark + .range(1, 10001) + .toDF("id") + .withColumn("name", lit("a")) + .createOrReplaceTempView("source_tbl") + + sql("INSERT INTO T SELECT * FROM source_tbl") + // update to create dv + sql("INSERT INTO T VALUES (1111, 'a_new'), (2222, 'a_new')") + + checkAnswer( + sql("SELECT count(*) FROM T"), + Seq(10000).toDF() + ) + checkAnswer( + sql("SELECT count(*) FROM T WHERE (id > 1000 and id <= 2000) or (id > 3000 and id <= 4000)"), + Seq(2000).toDF() + ) + checkAnswer( + spark.sql("SELECT name FROM T WHERE id = 1111"), + Seq("a_new").toDF() + ) + checkAnswer( + spark.sql("SELECT name FROM T WHERE id = 2222"), + Seq("a_new").toDF() + ) + } + private def getPathName(path: String): String = { new Path(path).getName }