Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.parquet.position.CollectionPosition;
import org.apache.paimon.format.parquet.position.LevelDelegation;
import org.apache.paimon.format.parquet.position.RowPosition;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.format.parquet.type.ParquetGroupField;
import org.apache.paimon.format.parquet.type.ParquetPrimitiveField;
Expand All @@ -41,6 +40,7 @@
import org.apache.parquet.column.page.PageReadStore;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -86,66 +86,87 @@ public NestedColumnReader(boolean isUtcTimestamp, PageReadStore pages, ParquetFi

@Override
public void readToVector(int readNumber, WritableColumnVector vector) throws IOException {
readData(field, readNumber, vector, false);
readData(field, readNumber, vector, false, false, false);
}

private Pair<LevelDelegation, WritableColumnVector> readData(
ParquetField field, int readNumber, ColumnVector vector, boolean inside)
ParquetField field,
int readNumber,
ColumnVector vector,
boolean inside,
boolean readRowField,
boolean readMapKey)
throws IOException {
if (field.getType() instanceof RowType) {
return readRow((ParquetGroupField) field, readNumber, vector, inside);
} else if (field.getType() instanceof MapType || field.getType() instanceof MultisetType) {
return readMap((ParquetGroupField) field, readNumber, vector, inside);
return readMap((ParquetGroupField) field, readNumber, vector, inside, readRowField);
} else if (field.getType() instanceof ArrayType) {
return readArray((ParquetGroupField) field, readNumber, vector, inside);
return readArray((ParquetGroupField) field, readNumber, vector, inside, readRowField);
} else {
return readPrimitive((ParquetPrimitiveField) field, readNumber, vector);
return readPrimitive(
(ParquetPrimitiveField) field, readNumber, vector, readRowField, readMapKey);
}
}

private Pair<LevelDelegation, WritableColumnVector> readRow(
ParquetGroupField field, int readNumber, ColumnVector vector, boolean inside)
throws IOException {
HeapRowVector heapRowVector = (HeapRowVector) vector;
LevelDelegation levelDelegation = null;
LevelDelegation longest = null;
List<ParquetField> children = field.getChildren();
WritableColumnVector[] childrenVectors = heapRowVector.getFields();
WritableColumnVector[] finalChildrenVectors =
new WritableColumnVector[childrenVectors.length];
for (int i = 0; i < children.size(); i++) {
Pair<LevelDelegation, WritableColumnVector> tuple =
readData(children.get(i), readNumber, childrenVectors[i], true);
levelDelegation = tuple.getLeft();
readData(children.get(i), readNumber, childrenVectors[i], true, true, false);
LevelDelegation current = tuple.getLeft();
if (longest == null) {
longest = current;
} else if (current.getDefinitionLevel().length > longest.getDefinitionLevel().length) {
longest = current;
}
finalChildrenVectors[i] = tuple.getRight();
}
if (levelDelegation == null) {
if (longest == null) {
throw new RuntimeException(
String.format("Row field does not have any children: %s.", field));
}

RowPosition rowPosition =
NestedPositionUtil.calculateRowOffsets(
field,
levelDelegation.getDefinitionLevel(),
levelDelegation.getRepetitionLevel());
int len = ((AbstractHeapVector) finalChildrenVectors[0]).getLen();
boolean[] isNull = new boolean[len];
Arrays.fill(isNull, true);
boolean hasNull = false;
for (int i = 0; i < len; i++) {
for (WritableColumnVector child : finalChildrenVectors) {
isNull[i] = isNull[i] && child.isNullAt(i);
}
if (isNull[i]) {
hasNull = true;
}
}

// If row was inside the structure, then we need to renew the vector to reset the
// capacity.
if (inside) {
heapRowVector =
new HeapRowVector(rowPosition.getPositionsCount(), finalChildrenVectors);
heapRowVector = new HeapRowVector(len, finalChildrenVectors);
} else {
heapRowVector.setFields(finalChildrenVectors);
}

if (rowPosition.getIsNull() != null) {
setFieldNullFalg(rowPosition.getIsNull(), heapRowVector);
if (hasNull) {
setFieldNullFlag(isNull, heapRowVector);
}
return Pair.of(levelDelegation, heapRowVector);
return Pair.of(longest, heapRowVector);
}

private Pair<LevelDelegation, WritableColumnVector> readMap(
ParquetGroupField field, int readNumber, ColumnVector vector, boolean inside)
ParquetGroupField field,
int readNumber,
ColumnVector vector,
boolean inside,
boolean readRowField)
throws IOException {
HeapMapVector mapVector = (HeapMapVector) vector;
mapVector.reset();
Expand All @@ -155,17 +176,30 @@ private Pair<LevelDelegation, WritableColumnVector> readMap(
"Maps must have two type parameters, found %s",
children.size());
Pair<LevelDelegation, WritableColumnVector> keyTuple =
readData(children.get(0), readNumber, mapVector.getKeyColumnVector(), true);
readData(
children.get(0),
readNumber,
mapVector.getKeyColumnVector(),
true,
false,
true);
Pair<LevelDelegation, WritableColumnVector> valueTuple =
readData(children.get(1), readNumber, mapVector.getValueColumnVector(), true);
readData(
children.get(1),
readNumber,
mapVector.getValueColumnVector(),
true,
false,
false);

LevelDelegation levelDelegation = keyTuple.getLeft();

CollectionPosition collectionPosition =
NestedPositionUtil.calculateCollectionOffsets(
field,
levelDelegation.getDefinitionLevel(),
levelDelegation.getRepetitionLevel());
levelDelegation.getRepetitionLevel(),
readRowField);

// If map was inside the structure, then we need to renew the vector to reset the
// capacity.
Expand All @@ -181,7 +215,7 @@ private Pair<LevelDelegation, WritableColumnVector> readMap(
}

if (collectionPosition.getIsNull() != null) {
setFieldNullFalg(collectionPosition.getIsNull(), mapVector);
setFieldNullFlag(collectionPosition.getIsNull(), mapVector);
}

mapVector.setLengths(collectionPosition.getLength());
Expand All @@ -191,7 +225,11 @@ private Pair<LevelDelegation, WritableColumnVector> readMap(
}

private Pair<LevelDelegation, WritableColumnVector> readArray(
ParquetGroupField field, int readNumber, ColumnVector vector, boolean inside)
ParquetGroupField field,
int readNumber,
ColumnVector vector,
boolean inside,
boolean readRowField)
throws IOException {
HeapArrayVector arrayVector = (HeapArrayVector) vector;
arrayVector.reset();
Expand All @@ -201,14 +239,15 @@ private Pair<LevelDelegation, WritableColumnVector> readArray(
"Arrays must have a single type parameter, found %s",
children.size());
Pair<LevelDelegation, WritableColumnVector> tuple =
readData(children.get(0), readNumber, arrayVector.getChild(), true);
readData(children.get(0), readNumber, arrayVector.getChild(), true, false, false);

LevelDelegation levelDelegation = tuple.getLeft();
CollectionPosition collectionPosition =
NestedPositionUtil.calculateCollectionOffsets(
field,
levelDelegation.getDefinitionLevel(),
levelDelegation.getRepetitionLevel());
levelDelegation.getRepetitionLevel(),
readRowField);

// If array was inside the structure, then we need to renew the vector to reset the
// capacity.
Expand All @@ -219,15 +258,20 @@ private Pair<LevelDelegation, WritableColumnVector> readArray(
}

if (collectionPosition.getIsNull() != null) {
setFieldNullFalg(collectionPosition.getIsNull(), arrayVector);
setFieldNullFlag(collectionPosition.getIsNull(), arrayVector);
}
arrayVector.setLengths(collectionPosition.getLength());
arrayVector.setOffsets(collectionPosition.getOffsets());
return Pair.of(levelDelegation, arrayVector);
}

private Pair<LevelDelegation, WritableColumnVector> readPrimitive(
ParquetPrimitiveField field, int readNumber, ColumnVector vector) throws IOException {
ParquetPrimitiveField field,
int readNumber,
ColumnVector vector,
boolean readRowField,
boolean readMapKey)
throws IOException {
ColumnDescriptor descriptor = field.getDescriptor();
NestedPrimitiveColumnReader reader = columnReaders.get(descriptor);
if (reader == null) {
Expand All @@ -237,15 +281,17 @@ private Pair<LevelDelegation, WritableColumnVector> readPrimitive(
pages.getPageReader(descriptor),
isUtcTimestamp,
descriptor.getPrimitiveType(),
field.getType());
field.getType(),
readRowField,
readMapKey);
columnReaders.put(descriptor, reader);
}
WritableColumnVector writableColumnVector =
reader.readAndNewVector(readNumber, (WritableColumnVector) vector);
return Pair.of(reader.getLevelDelegation(), writableColumnVector);
}

private static void setFieldNullFalg(boolean[] nullFlags, AbstractHeapVector vector) {
private static void setFieldNullFlag(boolean[] nullFlags, AbstractHeapVector vector) {
for (int index = 0; index < vector.getLen() && index < nullFlags.length; index++) {
if (nullFlags[index]) {
vector.setNullAt(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.format.parquet.reader;

import org.apache.paimon.format.parquet.position.CollectionPosition;
import org.apache.paimon.format.parquet.position.RowPosition;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.utils.BooleanArrayList;
import org.apache.paimon.utils.LongArrayList;
Expand All @@ -29,50 +28,6 @@
/** Utils to calculate nested type position. */
public class NestedPositionUtil {

/**
* Calculate row offsets according to column's max repetition level, definition level, value's
* repetition level and definition level. Each row has three situation:
* <li>Row is not defined,because it's optional parent fields is null, this is decided by its
* parent's repetition level
* <li>Row is null
* <li>Row is defined and not empty.
*
* @param field field that contains the row column message include max repetition level and
* definition level.
* @param fieldRepetitionLevels int array with each value's repetition level.
* @param fieldDefinitionLevels int array with each value's definition level.
* @return {@link RowPosition} contains collections row count and isNull array.
*/
public static RowPosition calculateRowOffsets(
ParquetField field, int[] fieldDefinitionLevels, int[] fieldRepetitionLevels) {
int rowDefinitionLevel = field.getDefinitionLevel();
int rowRepetitionLevel = field.getRepetitionLevel();
int nullValuesCount = 0;
BooleanArrayList nullRowFlags = new BooleanArrayList(0);
for (int i = 0; i < fieldDefinitionLevels.length; i++) {
if (fieldRepetitionLevels[i] > rowRepetitionLevel) {
throw new IllegalStateException(
format(
"In parquet's row type field repetition level should not larger than row's repetition level. "
+ "Row repetition level is %s, row field repetition level is %s.",
rowRepetitionLevel, fieldRepetitionLevels[i]));
}

if (fieldDefinitionLevels[i] >= rowDefinitionLevel) {
// current row is defined and not empty
nullRowFlags.add(false);
} else {
// current row is null
nullRowFlags.add(true);
nullValuesCount++;
}
}
if (nullValuesCount == 0) {
return new RowPosition(null, fieldDefinitionLevels.length);
}
return new RowPosition(nullRowFlags.toArray(), nullRowFlags.size());
}

/**
* Calculate the collection's offsets according to column's max repetition level, definition
* level, value's repetition level and definition level. Each collection (Array or Map) has four
Expand All @@ -92,7 +47,10 @@ public static RowPosition calculateRowOffsets(
* array.
*/
public static CollectionPosition calculateCollectionOffsets(
ParquetField field, int[] definitionLevels, int[] repetitionLevels) {
ParquetField field,
int[] definitionLevels,
int[] repetitionLevels,
boolean readRowField) {
int collectionDefinitionLevel = field.getDefinitionLevel();
int collectionRepetitionLevel = field.getRepetitionLevel() + 1;
int offset = 0;
Expand All @@ -105,36 +63,42 @@ public static CollectionPosition calculateCollectionOffsets(
for (int i = 0;
i < definitionLevels.length;
i = getNextCollectionStartIndex(repetitionLevels, collectionRepetitionLevel, i)) {
valueCount++;
if (definitionLevels[i] >= collectionDefinitionLevel - 1) {
boolean isNull =
isOptionalFieldValueNull(definitionLevels[i], collectionDefinitionLevel);
nullCollectionFlags.add(isNull);
nullValuesCount += isNull ? 1 : 0;
// definitionLevels[i] > collectionDefinitionLevel => Collection is defined and not
// empty
// definitionLevels[i] == collectionDefinitionLevel => Collection is defined but
// empty
// definitionLevels[i] == collectionDefinitionLevel - 1 => Collection is defined but
// null
if (definitionLevels[i] > collectionDefinitionLevel) {
nullCollectionFlags.add(false);
emptyCollectionFlags.add(false);
offset += getCollectionSize(repetitionLevels, collectionRepetitionLevel, i + 1);
} else if (definitionLevels[i] == collectionDefinitionLevel) {
offset++;
nullCollectionFlags.add(false);
emptyCollectionFlags.add(true);
// don't increase offset for empty values
} else {
offset++;
nullCollectionFlags.add(true);
nullValuesCount++;
// 1. don't increase offset for null values
// 2. offsets and emptyCollectionFlags are meaningless for null values, but they
// must be set at each index for calculating lengths later
emptyCollectionFlags.add(false);
}
offsets.add(offset);
} else {
// when definitionLevels[i] < collectionDefinitionLevel - 1, it means the collection
// is
// not defined, but we need to regard it as null to avoid getting value wrong.
valueCount++;
} else if (definitionLevels[i] == collectionDefinitionLevel - 2 && readRowField) {
// row field should store null value
nullCollectionFlags.add(true);
nullValuesCount++;
offsets.add(++offset);
emptyCollectionFlags.add(false);

offsets.add(offset);
valueCount++;
}
// else when definitionLevels[i] < collectionDefinitionLevel - 1, it means the
// collection is not defined, just ignore it
}
long[] offsetsArray = offsets.toArray();
long[] length = calculateLengthByOffsets(emptyCollectionFlags.toArray(), offsetsArray);
Expand All @@ -145,10 +109,6 @@ public static CollectionPosition calculateCollectionOffsets(
nullCollectionFlags.toArray(), offsetsArray, length, valueCount);
}

public static boolean isOptionalFieldValueNull(int definitionLevel, int maxDefinitionLevel) {
return definitionLevel == maxDefinitionLevel - 1;
}

public static long[] calculateLengthByOffsets(
boolean[] collectionIsEmpty, long[] arrayOffsets) {
LongArrayList lengthList = new LongArrayList(arrayOffsets.length);
Expand Down
Loading