From 606abd70f657f09894a2bb5301f5436c30e41b42 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Sat, 3 Aug 2024 00:52:43 +0800 Subject: [PATCH 1/4] [parquet] Child vector of complex type should arrange elements compactly (like orc) --- .../parquet/reader/NestedPositionUtil.java | 39 +++++++++------ .../reader/NestedPrimitiveColumnReader.java | 47 ++++++++++++++++--- 2 files changed, 66 insertions(+), 20 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java index 5f0757c23589..1c8298eaeb6a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java @@ -24,6 +24,8 @@ import org.apache.paimon.utils.BooleanArrayList; import org.apache.paimon.utils.LongArrayList; +import java.util.Arrays; + import static java.lang.String.format; /** Utils to calculate nested type position. */ @@ -105,12 +107,21 @@ public static CollectionPosition calculateCollectionOffsets( for (int i = 0; i < definitionLevels.length; i = getNextCollectionStartIndex(repetitionLevels, collectionRepetitionLevel, i)) { - valueCount++; if (definitionLevels[i] >= collectionDefinitionLevel - 1) { boolean isNull = isOptionalFieldValueNull(definitionLevels[i], collectionDefinitionLevel); - nullCollectionFlags.add(isNull); - nullValuesCount += isNull ? 1 : 0; + if (isNull) { + nullCollectionFlags.add(true); + nullValuesCount++; + // 1. don't increase offset for null values + // 2. offsets and emptyCollectionFlags are meaningless for null values, but they + // must be set at each index for calculating lengths later + offsets.add(offset); + emptyCollectionFlags.add(false); + continue; + } + + nullCollectionFlags.add(false); // definitionLevels[i] > collectionDefinitionLevel => Collection is defined and not // empty // definitionLevels[i] == collectionDefinitionLevel => Collection is defined but @@ -119,22 +130,22 @@ public static CollectionPosition calculateCollectionOffsets( emptyCollectionFlags.add(false); offset += getCollectionSize(repetitionLevels, collectionRepetitionLevel, i + 1); } else if (definitionLevels[i] == collectionDefinitionLevel) { - offset++; + // don't increase offset for empty values emptyCollectionFlags.add(true); } else { - offset++; - emptyCollectionFlags.add(false); + throw new IllegalStateException( + String.format( + "This case should be handled as null value. " + + "index: %d, definitionLevels: %s, collectionDefinitionLevel: %s.", + i, + Arrays.toString(definitionLevels), + collectionDefinitionLevel)); } offsets.add(offset); - } else { - // when definitionLevels[i] < collectionDefinitionLevel - 1, it means the collection - // is - // not defined, but we need to regard it as null to avoid getting value wrong. - nullCollectionFlags.add(true); - nullValuesCount++; - offsets.add(++offset); - emptyCollectionFlags.add(false); + valueCount++; } + // else when definitionLevels[i] < collectionDefinitionLevel - 1, it means the + // collection is not defined, just ignore it } long[] offsetsArray = offsets.toArray(); long[] length = calculateLengthByOffsets(emptyCollectionFlags.toArray(), offsetsArray); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java index 0b0d89d4de2c..0b9fdb1f6a1d 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java @@ -109,7 +109,7 @@ public class NestedPrimitiveColumnReader implements ColumnReader [5, 4, 5, 3, 2, 1, 0] + * + * + * + *

When (definitionLevel <= maxDefLevel - 2) we skip the value because children ColumnVector + * for OrcArrayColumnVector and OrcMapColumnVector don't contain empty and null set value. Stay + * consistent here. + */ private boolean readValue() throws IOException { int left = readPageIfNeed(); if (left > 0) { @@ -196,12 +215,14 @@ private boolean readValue() throws IOException { if (definitionLevel == maxDefLevel) { if (isCurrentPageDictionaryEncoded) { int dictionaryId = dataColumn.readValueDictionaryId(); - lastValue = dictionaryDecodeValue(dataType, dictionaryId); + lastValue.setValue(dictionaryDecodeValue(dataType, dictionaryId)); } else { - lastValue = readPrimitiveTypedRow(dataType); + lastValue.setValue(readPrimitiveTypedRow(dataType)); } + } else if (definitionLevel == maxDefLevel - 1) { + lastValue.setValue(null); } else { - lastValue = null; + lastValue.skip(); } return true; } else { @@ -641,4 +662,18 @@ public int nextInt() { return 0; } } + + private static class LastValueContainer { + protected boolean shouldSkip; + protected Object value; + + protected void setValue(Object value) { + this.value = value; + this.shouldSkip = false; + } + + protected void skip() { + this.shouldSkip = true; + } + } } From 7a1bca39ae8db751c34b5ec87e9c46a4d416e5ec Mon Sep 17 00:00:00 2001 From: yuzelin Date: Sat, 3 Aug 2024 16:52:53 +0800 Subject: [PATCH 2/4] fix --- .../parquet/reader/NestedColumnReader.java | 37 +++++++----- .../parquet/reader/NestedPositionUtil.java | 3 +- .../reader/NestedPrimitiveColumnReader.java | 20 +++++-- .../format/parquet/ParquetReadWriteTest.java | 57 ++++--------------- 4 files changed, 53 insertions(+), 64 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java index e3900580034b..165527adc688 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java @@ -86,11 +86,15 @@ public NestedColumnReader(boolean isUtcTimestamp, PageReadStore pages, ParquetFi @Override public void readToVector(int readNumber, WritableColumnVector vector) throws IOException { - readData(field, readNumber, vector, false); + readData(field, readNumber, vector, false, false); } private Pair readData( - ParquetField field, int readNumber, ColumnVector vector, boolean inside) + ParquetField field, + int readNumber, + ColumnVector vector, + boolean inside, + boolean parentIsRowType) throws IOException { if (field.getType() instanceof RowType) { return readRow((ParquetGroupField) field, readNumber, vector, inside); @@ -99,7 +103,8 @@ private Pair readData( } else if (field.getType() instanceof ArrayType) { return readArray((ParquetGroupField) field, readNumber, vector, inside); } else { - return readPrimitive((ParquetPrimitiveField) field, readNumber, vector); + return readPrimitive( + (ParquetPrimitiveField) field, readNumber, vector, parentIsRowType); } } @@ -114,7 +119,7 @@ private Pair readRow( new WritableColumnVector[childrenVectors.length]; for (int i = 0; i < children.size(); i++) { Pair tuple = - readData(children.get(i), readNumber, childrenVectors[i], true); + readData(children.get(i), readNumber, childrenVectors[i], true, true); levelDelegation = tuple.getLeft(); finalChildrenVectors[i] = tuple.getRight(); } @@ -139,7 +144,7 @@ private Pair readRow( } if (rowPosition.getIsNull() != null) { - setFieldNullFalg(rowPosition.getIsNull(), heapRowVector); + setFieldNullFlag(rowPosition.getIsNull(), heapRowVector); } return Pair.of(levelDelegation, heapRowVector); } @@ -155,9 +160,10 @@ private Pair readMap( "Maps must have two type parameters, found %s", children.size()); Pair keyTuple = - readData(children.get(0), readNumber, mapVector.getKeyColumnVector(), true); + readData(children.get(0), readNumber, mapVector.getKeyColumnVector(), true, false); Pair valueTuple = - readData(children.get(1), readNumber, mapVector.getValueColumnVector(), true); + readData( + children.get(1), readNumber, mapVector.getValueColumnVector(), true, false); LevelDelegation levelDelegation = keyTuple.getLeft(); @@ -181,7 +187,7 @@ private Pair readMap( } if (collectionPosition.getIsNull() != null) { - setFieldNullFalg(collectionPosition.getIsNull(), mapVector); + setFieldNullFlag(collectionPosition.getIsNull(), mapVector); } mapVector.setLengths(collectionPosition.getLength()); @@ -201,7 +207,7 @@ private Pair readArray( "Arrays must have a single type parameter, found %s", children.size()); Pair tuple = - readData(children.get(0), readNumber, arrayVector.getChild(), true); + readData(children.get(0), readNumber, arrayVector.getChild(), true, false); LevelDelegation levelDelegation = tuple.getLeft(); CollectionPosition collectionPosition = @@ -219,7 +225,7 @@ private Pair readArray( } if (collectionPosition.getIsNull() != null) { - setFieldNullFalg(collectionPosition.getIsNull(), arrayVector); + setFieldNullFlag(collectionPosition.getIsNull(), arrayVector); } arrayVector.setLengths(collectionPosition.getLength()); arrayVector.setOffsets(collectionPosition.getOffsets()); @@ -227,7 +233,11 @@ private Pair readArray( } private Pair readPrimitive( - ParquetPrimitiveField field, int readNumber, ColumnVector vector) throws IOException { + ParquetPrimitiveField field, + int readNumber, + ColumnVector vector, + boolean parentIsRowType) + throws IOException { ColumnDescriptor descriptor = field.getDescriptor(); NestedPrimitiveColumnReader reader = columnReaders.get(descriptor); if (reader == null) { @@ -237,7 +247,8 @@ private Pair readPrimitive( pages.getPageReader(descriptor), isUtcTimestamp, descriptor.getPrimitiveType(), - field.getType()); + field.getType(), + parentIsRowType); columnReaders.put(descriptor, reader); } WritableColumnVector writableColumnVector = @@ -245,7 +256,7 @@ private Pair readPrimitive( return Pair.of(reader.getLevelDelegation(), writableColumnVector); } - private static void setFieldNullFalg(boolean[] nullFlags, AbstractHeapVector vector) { + private static void setFieldNullFlag(boolean[] nullFlags, AbstractHeapVector vector) { for (int index = 0; index < vector.getLen() && index < nullFlags.length; index++) { if (nullFlags[index]) { vector.setNullAt(index); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java index 1c8298eaeb6a..62338278512a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java @@ -108,6 +108,8 @@ public static CollectionPosition calculateCollectionOffsets( i < definitionLevels.length; i = getNextCollectionStartIndex(repetitionLevels, collectionRepetitionLevel, i)) { if (definitionLevels[i] >= collectionDefinitionLevel - 1) { + valueCount++; + boolean isNull = isOptionalFieldValueNull(definitionLevels[i], collectionDefinitionLevel); if (isNull) { @@ -142,7 +144,6 @@ public static CollectionPosition calculateCollectionOffsets( collectionDefinitionLevel)); } offsets.add(offset); - valueCount++; } // else when definitionLevels[i] < collectionDefinitionLevel - 1, it means the // collection is not defined, just ignore it diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java index 0b9fdb1f6a1d..7837eb8148e9 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java @@ -73,12 +73,13 @@ public class NestedPrimitiveColumnReader implements ColumnReader [5, 4, 5, 3, 2, 1, 0] + * An ARRAY[ARRAY[INT]] Example: {[[0, null], [1], [], null], [], null} => [5, 4, 5, 3, 2, 1, 0] * *