From 6d9aa4f136f7d3fede401434d74bf679530bad3e Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Fri, 24 Jan 2025 13:51:58 +0800 Subject: [PATCH] [parquet] Introduce LongIterator to Parquet RowIndexGenerator --- .../data/columnar/ColumnarRowIterator.java | 17 +++-- .../data/columnar/VectorizedRowIterator.java | 3 +- .../org/apache/paimon/utils/LongIterator.java | 69 +++++++++++++++++++ .../apache/paimon/utils/LongIteratorTest.java | 50 ++++++++++++++ .../parquet/newreader/ColumnarBatch.java | 3 +- .../parquet/newreader/RowIndexGenerator.java | 28 ++++---- .../VectorizedParquetRecordReader.java | 2 +- 7 files changed, 144 insertions(+), 28 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/utils/LongIterator.java create mode 100644 paimon-common/src/test/java/org/apache/paimon/utils/LongIteratorTest.java diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java index 1cd6e3199c90..980c15c3a0a4 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java @@ -23,6 +23,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.reader.FileRecordIterator; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.utils.LongIterator; import org.apache.paimon.utils.RecyclableIterator; import org.apache.paimon.utils.VectorMappingUtils; @@ -51,17 +52,15 @@ public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable Runnable re } public void reset(long nextFilePos) { - long[] positions = new long[row.batch().getNumRows()]; - for (int i = 0; i < row.batch().getNumRows(); i++) { - positions[i] = nextFilePos++; - } - reset(positions); + reset(LongIterator.fromRange(nextFilePos, nextFilePos + positions.length)); } - public void reset(long[] positions) { - assert positions.length == row.batch().getNumRows(); - this.positions = positions; + public void reset(LongIterator positions) { this.num = row.batch().getNumRows(); + this.positions = new long[num]; + for (int i = 0; i < num; i++) { + this.positions[i] = positions.next(); + } this.nextPos = 0; } @@ -92,7 +91,7 @@ public Path filePath() { protected ColumnarRowIterator copy(ColumnVector[] vectors) { ColumnarRowIterator newIterator = new ColumnarRowIterator(filePath, row.copy(vectors), recycler); - newIterator.reset(positions); + newIterator.reset(LongIterator.fromArray(positions)); return newIterator; } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java index 592a7845fcdd..fa9b1ded8412 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java @@ -20,6 +20,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.reader.VectorizedRecordIterator; +import org.apache.paimon.utils.LongIterator; import javax.annotation.Nullable; @@ -39,7 +40,7 @@ public VectorizedColumnBatch batch() { protected VectorizedRowIterator copy(ColumnVector[] vectors) { VectorizedRowIterator newIterator = new VectorizedRowIterator(filePath, row.copy(vectors), recycler); - newIterator.reset(positions); + newIterator.reset(LongIterator.fromArray(positions)); return newIterator; } } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/LongIterator.java b/paimon-common/src/main/java/org/apache/paimon/utils/LongIterator.java new file mode 100644 index 000000000000..a63358f8e9a2 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/LongIterator.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import java.util.NoSuchElementException; + +/** Iterator for long. */ +public interface LongIterator { + + boolean hasNext(); + + long next(); + + static LongIterator fromRange(final long startInclusive, final long endExclusive) { + return new LongIterator() { + + private long i = startInclusive; + + @Override + public boolean hasNext() { + return i < endExclusive; + } + + @Override + public long next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return i++; + } + }; + } + + static LongIterator fromArray(final long[] longs) { + return new LongIterator() { + + private int i = 0; + + @Override + public boolean hasNext() { + return i < longs.length; + } + + @Override + public long next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return longs[i++]; + } + }; + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/utils/LongIteratorTest.java b/paimon-common/src/test/java/org/apache/paimon/utils/LongIteratorTest.java new file mode 100644 index 000000000000..dd90ef680b37 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/utils/LongIteratorTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +class LongIteratorTest { + + @Test + public void testRange() { + LongIterator iterator = LongIterator.fromRange(5, 10); + List list = new ArrayList<>(); + while (iterator.hasNext()) { + list.add(iterator.next()); + } + assertThat(list).containsExactlyInAnyOrder(5L, 6L, 7L, 8L, 9L); + } + + @Test + public void testFromArray() { + long[] array = new long[] {5L, 6L, 7L, 8L, 9L}; + LongIterator iterator = LongIterator.fromArray(array); + List list = new ArrayList<>(); + while (iterator.hasNext()) { + list.add(iterator.next()); + } + assertThat(list).containsExactlyInAnyOrder(5L, 6L, 7L, 8L, 9L); + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ColumnarBatch.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ColumnarBatch.java index 6bf00c106c6c..70695b39848c 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ColumnarBatch.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ColumnarBatch.java @@ -27,6 +27,7 @@ import org.apache.paimon.data.columnar.VectorizedColumnBatch; import org.apache.paimon.data.columnar.VectorizedRowIterator; import org.apache.paimon.fs.Path; +import org.apache.paimon.utils.LongIterator; import java.util.Arrays; @@ -55,7 +56,7 @@ public ColumnarBatch(Path filePath, ColumnVector[] columns) { } /** Reset next record position and return self. */ - public void resetPositions(long[] positions) { + public void resetPositions(LongIterator positions) { vectorizedRowIterator.reset(positions); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/RowIndexGenerator.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/RowIndexGenerator.java index 7e302a43f327..5de08910cdbf 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/RowIndexGenerator.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/RowIndexGenerator.java @@ -18,10 +18,11 @@ package org.apache.paimon.format.parquet.newreader; +import org.apache.paimon.utils.LongIterator; + import org.apache.parquet.column.page.PageReadStore; -import java.util.Iterator; -import java.util.stream.Stream; +import java.util.PrimitiveIterator; /* This file is based on source code from the Spark Project (http://spark.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for @@ -30,37 +31,32 @@ /** Generate row index for columnar batch. */ public class RowIndexGenerator { - Iterator rowIndexIterator; + private LongIterator rowIndexIterator; public void initFromPageReadStore(PageReadStore pageReadStore) { long startingRowIdx = pageReadStore.getRowIndexOffset().orElse(0L); - - if (pageReadStore.getRowIndexes().isPresent()) { - final Iterator rowIndexes = pageReadStore.getRowIndexes().get(); + PrimitiveIterator.OfLong rowIndexes = pageReadStore.getRowIndexes().orElse(null); + if (rowIndexes != null) { rowIndexIterator = - new Iterator() { + new LongIterator() { @Override public boolean hasNext() { return rowIndexes.hasNext(); } @Override - public Long next() { - return rowIndexes.next() + startingRowIdx; + public long next() { + return rowIndexes.nextLong() + startingRowIdx; } }; } else { long numRowsInRowGroup = pageReadStore.getRowCount(); rowIndexIterator = - Stream.iterate(startingRowIdx, i -> i + 1).limit(numRowsInRowGroup).iterator(); + LongIterator.fromRange(startingRowIdx, startingRowIdx + numRowsInRowGroup); } } - public void populateRowIndex(ColumnarBatch columnarBatch, int numRows) { - long[] rowIndexes = new long[numRows]; - for (int i = 0; i < numRows; i++) { - rowIndexes[i] = rowIndexIterator.next(); - } - columnarBatch.resetPositions(rowIndexes); + public void populateRowIndex(ColumnarBatch columnarBatch) { + columnarBatch.resetPositions(rowIndexIterator); } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedParquetRecordReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedParquetRecordReader.java index e54fcd493f59..90123113dac1 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedParquetRecordReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedParquetRecordReader.java @@ -286,7 +286,7 @@ public boolean nextBatch() throws IOException { } rowsReturned += num; columnarBatch.setNumRows(num); - rowIndexGenerator.populateRowIndex(columnarBatch, num); + rowIndexGenerator.populateRowIndex(columnarBatch); return true; }