Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import java.util.Arrays;

/** Heap vector that nullable shared structure. */
public abstract class AbstractHeapVector extends AbstractWritableVector {
public abstract class AbstractHeapVector extends AbstractWritableVector
implements ElementCountable {

public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;

Expand Down Expand Up @@ -116,6 +117,7 @@ public HeapIntVector getDictionaryIds() {
return dictionaryIds;
}

@Override
public int getLen() {
return this.len;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,10 @@
* limitations under the License.
*/

package org.apache.paimon.format.parquet.position;
package org.apache.paimon.data.columnar.heap;

import javax.annotation.Nullable;
/** Container with a known number of elements. */
public interface ElementCountable {

/** To represent struct's position in repeated type. */
public class RowPosition {
@Nullable private final boolean[] isNull;
private final int positionsCount;

public RowPosition(boolean[] isNull, int positionsCount) {
this.isNull = isNull;
this.positionsCount = positionsCount;
}

public boolean[] getIsNull() {
return isNull;
}

public int getPositionsCount() {
return positionsCount;
}
int getLen();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.math.BigDecimal;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -573,6 +574,24 @@ public void testCountStarPK() {
validateCount1NotPushDown(sql);
}

@Test
public void testParquetRowDecimalAndTimestamp() {
sql(
"CREATE TABLE parquet_row_decimal(`row` ROW<f0 DECIMAL(2,1)>) WITH ('file.format' = 'parquet')");
sql("INSERT INTO parquet_row_decimal VALUES ( (ROW(1.2)) )");

assertThat(sql("SELECT * FROM parquet_row_decimal"))
.containsExactly(Row.of(Row.of(new BigDecimal("1.2"))));

sql(
"CREATE TABLE parquet_row_timestamp(`row` ROW<f0 TIMESTAMP(0)>) WITH ('file.format' = 'parquet')");
sql("INSERT INTO parquet_row_timestamp VALUES ( (ROW(TIMESTAMP'2024-11-13 18:00:00')) )");

assertThat(sql("SELECT * FROM parquet_row_timestamp"))
.containsExactly(
Row.of(Row.of(DateTimeUtils.toLocalDateTime("2024-11-13 18:00:00", 0))));
}

private void validateCount1PushDown(String sql) {
Transformation<?> transformation = AbstractTestBase.translate(tEnv, sql);
while (!transformation.getInputs().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.heap.ElementCountable;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.parquet.reader.ColumnReader;
Expand Down Expand Up @@ -293,7 +294,10 @@ private VectorizedColumnBatch createVectorizedColumnBatch(
for (int i = 0; i < writableVectors.length; i++) {
switch (projectedFields[i].type().getTypeRoot()) {
case DECIMAL:
vectors[i] = new ParquetDecimalVector(writableVectors[i]);
vectors[i] =
new ParquetDecimalVector(
writableVectors[i],
((ElementCountable) writableVectors[i]).getLen());
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.heap.AbstractHeapVector;
import org.apache.paimon.data.columnar.heap.ElementCountable;
import org.apache.paimon.data.columnar.heap.HeapArrayVector;
import org.apache.paimon.data.columnar.heap.HeapMapVector;
import org.apache.paimon.data.columnar.heap.HeapRowVector;
Expand Down Expand Up @@ -134,7 +135,7 @@ private Pair<LevelDelegation, WritableColumnVector> readRow(
String.format("Row field does not have any children: %s.", field));
}

int len = ((AbstractHeapVector) finalChildrenVectors[0]).getLen();
int len = ((ElementCountable) finalChildrenVectors[0]).getLen();
boolean[] isNull = new boolean[len];
Arrays.fill(isNull, true);
boolean hasNull = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ private WritableColumnVector fillColumnVector(int total, List valueList) {
phiv.vector[i] = ((List<Integer>) valueList).get(i);
}
}
return new ParquetDecimalVector(phiv);
return new ParquetDecimalVector(phiv, total);
case INT64:
HeapLongVector phlv = new HeapLongVector(total);
for (int i = 0; i < valueList.size(); i++) {
Expand All @@ -505,10 +505,10 @@ private WritableColumnVector fillColumnVector(int total, List valueList) {
phlv.vector[i] = ((List<Long>) valueList).get(i);
}
}
return new ParquetDecimalVector(phlv);
return new ParquetDecimalVector(phlv, total);
default:
HeapBytesVector phbv = getHeapBytesVector(total, valueList);
return new ParquetDecimalVector(phbv);
return new ParquetDecimalVector(phbv, total);
}
default:
throw new RuntimeException("Unsupported type in the list: " + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.data.columnar.Dictionary;
import org.apache.paimon.data.columnar.IntColumnVector;
import org.apache.paimon.data.columnar.LongColumnVector;
import org.apache.paimon.data.columnar.heap.ElementCountable;
import org.apache.paimon.data.columnar.writable.WritableBytesVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
Expand All @@ -38,12 +39,18 @@
* {@link DecimalColumnVector} interface.
*/
public class ParquetDecimalVector
implements DecimalColumnVector, WritableLongVector, WritableIntVector, WritableBytesVector {
implements DecimalColumnVector,
WritableLongVector,
WritableIntVector,
WritableBytesVector,
ElementCountable {

private final ColumnVector vector;
private final int len;

public ParquetDecimalVector(ColumnVector vector) {
public ParquetDecimalVector(ColumnVector vector, int len) {
this.vector = vector;
this.len = len;
}

@Override
Expand Down Expand Up @@ -225,4 +232,9 @@ public void fill(long value) {
((WritableLongVector) vector).fill(value);
}
}

@Override
public int getLen() {
return len;
}
}

This file was deleted.