Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,20 @@ from https://parquet.apache.org/ version 1.14.0
paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantBuilder.java
paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantUtil.java
paimon-format/src/main/java/org/apache/paimon/format/parquet/newReader/ParquetColumnVector.java
paimon-format/src/main/java/org/apache/paimon/format/parquet/newReader/ParquetReadState.java
paimon-format/src/main/java/org/apache/paimon/format/parquet/newReader/ParquetVectorUpdater.java
paimon-format/src/main/java/org/apache/paimon/format/parquet/newReader/ParquetVectorUpdaterFactory.java
paimon-format/src/main/java/org/apache/paimon/format/parquet/newReader/RowIndexGenerator.java
paimon-format/src/main/java/org/apache/paimon/format/parquet/newReader/VectorizedColumnReader.java
paimon-format/src/main/java/org/apache/paimon/format/parquet/newReader/VectorizedDeltaBinaryPackedReader.java
paimon-format/src/main/java/org/apache/paimon/format/parquet/newReader/VectorizedDeltaByteArrayReader.java
paimon-format/src/main/java/org/apache/paimon/format/parquet/newReader/VectorizedDeltaLengthByteArrayReader.java
paimon-format/src/main/java/org/apache/paimon/format/parquet/newReader/VectorizedParquetRecordReader.java
paimon-format/src/main/java/org/apache/paimon/format/parquet/newReader/VectorizedPlainValuesReader.java
paimon-format/src/main/java/org/apache/paimon/format/parquet/newReader/VectorizedReaderBase.java
paimon-format/src/main/java/org/apache/paimon/format/parquet/newReader/VectorizedRleValuesReader.java
paimon-format/src/main/java/org/apache/paimon/format/parquet/newReader/VectorizedValuesReader.java
from https://spark.apache.org/ version 4.0.0-preview2

MIT License
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,15 +519,8 @@ public InternalMap getMap(int index) {
}

@Override
public ColumnVector getKeyColumnVector() {
init();
return keyColumnVector;
}

@Override
public ColumnVector getValueColumnVector() {
init();
return valueColumnVector;
public ColumnVector[] getChildren() {
return new ColumnVector[] {keyColumnVector, valueColumnVector};
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,12 +620,12 @@ protected void doWrite(
ArrayChildWriteInfo arrayChildWriteInfo =
getArrayChildWriteInfo(pickedInColumn, startIndex, lengths);
keyWriter.write(
mapColumnVector.getKeyColumnVector(),
mapColumnVector.getChildren()[0],
arrayChildWriteInfo.pickedInColumn,
arrayChildWriteInfo.startIndex,
arrayChildWriteInfo.batchRows);
valueWriter.write(
mapColumnVector.getValueColumnVector(),
mapColumnVector.getChildren()[1],
arrayChildWriteInfo.pickedInColumn,
arrayChildWriteInfo.startIndex,
arrayChildWriteInfo.batchRows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,14 @@

/** Nullable column vector. Access data through specific subclasses. */
public interface ColumnVector {

boolean isNullAt(int i);

default int getCapacity() {
return Integer.MAX_VALUE;
}

default ColumnVector[] getChildren() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class ColumnarRowIterator extends RecyclableIterator<InternalRow>

protected int num;
protected int nextPos;
protected long nextFilePos;
protected long[] positions;

public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable Runnable recycler) {
super(recycler);
Expand All @@ -51,17 +51,25 @@ public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable Runnable re
}

public void reset(long nextFilePos) {
long[] positions = new long[row.batch().getNumRows()];
for (int i = 0; i < row.batch().getNumRows(); i++) {
positions[i] = nextFilePos++;
}
reset(positions);
}

public void reset(long[] positions) {
assert positions.length == row.batch().getNumRows();
this.positions = positions;
this.num = row.batch().getNumRows();
this.nextPos = 0;
this.nextFilePos = nextFilePos;
}

@Nullable
@Override
public InternalRow next() {
if (nextPos < num) {
row.setRowId(nextPos++);
nextFilePos++;
return row;
} else {
return null;
Expand All @@ -70,7 +78,10 @@ public InternalRow next() {

@Override
public long returnedPosition() {
return nextFilePos - 1;
if (nextPos == 0) {
return positions[0] - 1;
}
return positions[nextPos - 1];
}

@Override
Expand All @@ -81,7 +92,7 @@ public Path filePath() {
protected ColumnarRowIterator copy(ColumnVector[] vectors) {
ColumnarRowIterator newIterator =
new ColumnarRowIterator(filePath, row.copy(vectors), recycler);
newIterator.reset(nextFilePos);
newIterator.reset(positions);
return newIterator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,4 @@
/** Map column vector. */
public interface MapColumnVector extends ColumnVector {
InternalMap getMap(int i);

ColumnVector getKeyColumnVector();

ColumnVector getValueColumnVector();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public VectorizedColumnBatch batch() {
protected VectorizedRowIterator copy(ColumnVector[] vectors) {
VectorizedRowIterator newIterator =
new VectorizedRowIterator(filePath, row.copy(vectors), recycler);
newIterator.reset(nextFilePos);
newIterator.reset(positions);
return newIterator;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.data.columnar.heap;

import org.apache.paimon.data.columnar.ColumnVector;

import java.util.Arrays;

/** Abstract class for vectors that have offsets and lengths. */
public class AbstractArrayBasedVector extends AbstractStructVector {

protected long[] offsets;
protected long[] lengths;

public AbstractArrayBasedVector(int len, ColumnVector[] children) {
super(len, children);
this.offsets = new long[capacity];
this.lengths = new long[capacity];
}

public void putOffsetLength(int index, long offset, long length) {
offsets[index] = offset;
lengths[index] = length;
}

public long[] getOffsets() {
return offsets;
}

public void setOffsets(long[] offsets) {
this.offsets = offsets;
}

public long[] getLengths() {
return lengths;
}

public void setLengths(long[] lengths) {
this.lengths = lengths;
}

@Override
void reserveForHeapVector(int newCapacity) {
if (offsets.length < newCapacity) {
offsets = Arrays.copyOf(offsets, newCapacity);
lengths = Arrays.copyOf(lengths, newCapacity);
}
}

@Override
public void reset() {
super.reset();
if (offsets.length != capacity) {
offsets = new long[capacity];
} else {
Arrays.fill(offsets, 0);
}
if (lengths.length != capacity) {
lengths = new long[capacity];
} else {
Arrays.fill(lengths, 0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,23 @@ public abstract class AbstractHeapVector extends AbstractWritableVector
/** Reusable column for ids of dictionary. */
protected HeapIntVector dictionaryIds;

private final int len;

public AbstractHeapVector(int len) {
isNull = new boolean[len];
this.len = len;
public AbstractHeapVector(int capacity) {
super(capacity);
isNull = new boolean[capacity];
}

/**
* Resets the column to default state. - fills the isNull array with false. - sets noNulls to
* true.
*/
/** Resets the column to default state. - fills the isNull array with false. */
@Override
public void reset() {
if (!noNulls) {
super.reset();
if (isNull.length != capacity) {
isNull = new boolean[capacity];
} else {
Arrays.fill(isNull, false);
}
noNulls = true;
if (dictionaryIds != null) {
dictionaryIds.reset();
}
}

@Override
Expand All @@ -90,7 +90,7 @@ public void fillWithNulls() {

@Override
public boolean isNullAt(int i) {
return !noNulls && isNull[i];
return isAllNull || (!noNulls && isNull[i]);
}

@Override
Expand Down Expand Up @@ -118,7 +118,12 @@ public HeapIntVector getDictionaryIds() {
}

@Override
public int getLen() {
return this.len;
protected void reserveInternal(int newCapacity) {
if (isNull.length < newCapacity) {
isNull = Arrays.copyOf(isNull, newCapacity);
}
reserveForHeapVector(newCapacity);
}

abstract void reserveForHeapVector(int newCapacity);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.data.columnar.heap;

import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;

/** * Abstract class for vectors that have children. */
public abstract class AbstractStructVector extends AbstractHeapVector
implements WritableColumnVector {

protected ColumnVector[] children;

public AbstractStructVector(int capacity, ColumnVector[] children) {
super(capacity);
this.children = children;
}

@Override
public void reset() {
super.reset();
for (ColumnVector child : children) {
if (child instanceof WritableColumnVector) {
((WritableColumnVector) child).reset();
}
}
}

@Override
public ColumnVector[] getChildren() {
return children;
}
}
Loading
Loading