Skip to content
15 changes: 15 additions & 0 deletions paimon-format/src/main/java/org/apache/orc/OrcConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,21 @@ public enum OrcConf {
+ "must have the filter\n"
+ "reapplied to avoid using unset values in the unselected rows.\n"
+ "If unsure please leave this as false."),

READER_ONLY_ALLOW_SARG_TO_FILTER(
"orc.reader.sarg.to.filter",
"orc.reader.sarg.to.filter",
false,
"A boolean flag to determine if a SArg is allowed to become a filter, only for reader."),
READER_ONLY_USE_SELECTED(
"orc.reader.filter.use.selected",
"orc.reader.filter.use.selected",
false,
"A boolean flag to determine if the selected vector is supported by\n"
+ "the reading application, only for reader. If false, the output of the ORC reader "
+ "must have the filter\n"
+ "reapplied to avoid using unset values in the unselected rows.\n"
+ "If unsure please leave this as false."),
ALLOW_PLUGIN_FILTER(
"orc.filter.plugin",
"orc.filter.plugin",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public Optional<SimpleStatsExtractor> createStatsExtractor(
public FormatReaderFactory createReaderFactory(
RowType projectedRowType, @Nullable List<Predicate> filters) {
List<OrcFilters.Predicate> orcPredicates = new ArrayList<>();

if (filters != null) {
for (Predicate pred : filters) {
Optional<OrcFilters.Predicate> orcPred =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ public OrcReaderBatch createReaderBatch(
for (int i = 0; i < vectors.length; i++) {
String name = tableFieldNames.get(i);
DataType type = tableFieldTypes.get(i);
vectors[i] = createPaimonVector(orcBatch.cols[tableFieldNames.indexOf(name)], type);
vectors[i] =
createPaimonVector(
orcBatch.cols[tableFieldNames.indexOf(name)], orcBatch, type);
}
return new OrcReaderBatch(filePath, orcBatch, new VectorizedColumnBatch(vectors), recycler);
}
Expand Down Expand Up @@ -268,7 +270,13 @@ private static RecordReader createRecordReader(
.skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
.tolerateMissingSchema(
OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));

if (!conjunctPredicates.isEmpty()) {
// TODO fix it , if open this option,future deletion vectors would not work,
// cased by getRowNumber would be changed .
options.useSelected(OrcConf.READER_ONLY_USE_SELECTED.getBoolean(conf));
options.allowSARGToFilter(
OrcConf.READER_ONLY_ALLOW_SARG_TO_FILTER.getBoolean(conf));
}
// configure filters
if (!conjunctPredicates.isEmpty()) {
SearchArgument.Builder b = SearchArgumentFactory.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,44 +33,57 @@
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

/** This column vector is used to adapt hive's ColumnVector to Paimon's ColumnVector. */
public abstract class AbstractOrcColumnVector
implements org.apache.paimon.data.columnar.ColumnVector {

private final ColumnVector vector;

AbstractOrcColumnVector(ColumnVector vector) {
private final VectorizedRowBatch orcBatch;

AbstractOrcColumnVector(ColumnVector vector, VectorizedRowBatch orcBatch) {
this.vector = vector;
this.orcBatch = orcBatch;
}

protected int rowMapper(int r) {
if (vector.isRepeating) {
return 0;
}
return this.orcBatch.selectedInUse ? this.orcBatch.getSelected()[r] : r;
}

@Override
public boolean isNullAt(int i) {
return !vector.noNulls && vector.isNull[vector.isRepeating ? 0 : i];
return !vector.noNulls && vector.isNull[rowMapper(i)];
}

public static org.apache.paimon.data.columnar.ColumnVector createPaimonVector(
ColumnVector vector, DataType dataType) {
ColumnVector vector, VectorizedRowBatch orcBatch, DataType dataType) {
if (vector instanceof LongColumnVector) {
if (dataType.getTypeRoot() == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
return new OrcLegacyTimestampColumnVector((LongColumnVector) vector);
return new OrcLegacyTimestampColumnVector((LongColumnVector) vector, orcBatch);
} else {
return new OrcLongColumnVector((LongColumnVector) vector);
return new OrcLongColumnVector((LongColumnVector) vector, orcBatch);
}
} else if (vector instanceof DoubleColumnVector) {
return new OrcDoubleColumnVector((DoubleColumnVector) vector);
return new OrcDoubleColumnVector((DoubleColumnVector) vector, orcBatch);
} else if (vector instanceof BytesColumnVector) {
return new OrcBytesColumnVector((BytesColumnVector) vector);
return new OrcBytesColumnVector((BytesColumnVector) vector, orcBatch);
} else if (vector instanceof DecimalColumnVector) {
return new OrcDecimalColumnVector((DecimalColumnVector) vector);
return new OrcDecimalColumnVector((DecimalColumnVector) vector, orcBatch);
} else if (vector instanceof TimestampColumnVector) {
return new OrcTimestampColumnVector(vector);
return new OrcTimestampColumnVector(vector, orcBatch);
} else if (vector instanceof ListColumnVector) {
return new OrcArrayColumnVector((ListColumnVector) vector, (ArrayType) dataType);
return new OrcArrayColumnVector(
(ListColumnVector) vector, orcBatch, (ArrayType) dataType);
} else if (vector instanceof StructColumnVector) {
return new OrcRowColumnVector((StructColumnVector) vector, (RowType) dataType);
return new OrcRowColumnVector(
(StructColumnVector) vector, orcBatch, (RowType) dataType);
} else if (vector instanceof MapColumnVector) {
return new OrcMapColumnVector((MapColumnVector) vector, (MapType) dataType);
return new OrcMapColumnVector((MapColumnVector) vector, orcBatch, (MapType) dataType);
} else {
throw new UnsupportedOperationException(
"Unsupported vector: " + vector.getClass().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.types.ArrayType;

import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

/** This column vector is used to adapt hive's ListColumnVector to Paimon's ArrayColumnVector. */
public class OrcArrayColumnVector extends AbstractOrcColumnVector
Expand All @@ -32,14 +33,16 @@ public class OrcArrayColumnVector extends AbstractOrcColumnVector
private final ListColumnVector hiveVector;
private final ColumnVector paimonVector;

public OrcArrayColumnVector(ListColumnVector hiveVector, ArrayType type) {
super(hiveVector);
public OrcArrayColumnVector(
ListColumnVector hiveVector, VectorizedRowBatch orcBatch, ArrayType type) {
super(hiveVector, orcBatch);
this.hiveVector = hiveVector;
this.paimonVector = createPaimonVector(hiveVector.child, type.getElementType());
this.paimonVector = createPaimonVector(hiveVector.child, orcBatch, type.getElementType());
}

@Override
public InternalArray getArray(int i) {
i = rowMapper(i);
long offset = hiveVector.offsets[i];
long length = hiveVector.lengths[i];
return new ColumnarArray(paimonVector, (int) offset, (int) length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@
package org.apache.paimon.format.orc.reader;

import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

/** This column vector is used to adapt hive's BytesColumnVector to Paimon's BytesColumnVector. */
public class OrcBytesColumnVector extends AbstractOrcColumnVector
implements org.apache.paimon.data.columnar.BytesColumnVector {

private final BytesColumnVector vector;

public OrcBytesColumnVector(BytesColumnVector vector) {
super(vector);
public OrcBytesColumnVector(BytesColumnVector vector, VectorizedRowBatch orcBatch) {
super(vector, orcBatch);
this.vector = vector;
}

@Override
public Bytes getBytes(int i) {
int rowId = vector.isRepeating ? 0 : i;
int rowId = rowMapper(i);
byte[][] data = vector.vector;
int[] start = vector.start;
int[] length = vector.length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.data.Decimal;

import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

import java.math.BigDecimal;

Expand All @@ -32,15 +33,15 @@ public class OrcDecimalColumnVector extends AbstractOrcColumnVector

private final DecimalColumnVector vector;

public OrcDecimalColumnVector(DecimalColumnVector vector) {
super(vector);
public OrcDecimalColumnVector(DecimalColumnVector vector, VectorizedRowBatch orcBatch) {
super(vector, orcBatch);
this.vector = vector;
}

@Override
public Decimal getDecimal(int i, int precision, int scale) {
BigDecimal data =
vector.vector[vector.isRepeating ? 0 : i].getHiveDecimal().bigDecimalValue();
i = rowMapper(i);
BigDecimal data = vector.vector[i].getHiveDecimal().bigDecimalValue();
return Decimal.fromBigDecimal(data, precision, scale);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.format.orc.reader;

import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

/**
* This column vector is used to adapt hive's DoubleColumnVector to Paimon's float and double
Expand All @@ -30,18 +31,20 @@ public class OrcDoubleColumnVector extends AbstractOrcColumnVector

private final DoubleColumnVector vector;

public OrcDoubleColumnVector(DoubleColumnVector vector) {
super(vector);
public OrcDoubleColumnVector(DoubleColumnVector vector, VectorizedRowBatch orcBatch) {
super(vector, orcBatch);
this.vector = vector;
}

@Override
public double getDouble(int i) {
return vector.vector[vector.isRepeating ? 0 : i];
i = rowMapper(i);
return vector.vector[i];
}

@Override
public float getFloat(int i) {
return (float) vector.vector[vector.isRepeating ? 0 : i];
i = rowMapper(i);
return (float) vector.vector[i];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

import java.time.LocalDateTime;

Expand All @@ -34,15 +35,15 @@ public class OrcLegacyTimestampColumnVector extends AbstractOrcColumnVector

private final LongColumnVector hiveVector;

OrcLegacyTimestampColumnVector(LongColumnVector vector) {
super(vector);
OrcLegacyTimestampColumnVector(LongColumnVector vector, VectorizedRowBatch orcBatch) {
super(vector, orcBatch);
this.hiveVector = vector;
}

@Override
public Timestamp getTimestamp(int i, int precision) {
int index = hiveVector.isRepeating ? 0 : i;
java.sql.Timestamp timestamp = toTimestamp(hiveVector.vector[index]);
i = rowMapper(i);
java.sql.Timestamp timestamp = toTimestamp(hiveVector.vector[i]);
return Timestamp.fromSQLTimestamp(timestamp);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.format.orc.reader;

import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

/**
* This column vector is used to adapt hive's LongColumnVector to Paimon's boolean, byte, short, int
Expand All @@ -33,33 +34,38 @@ public class OrcLongColumnVector extends AbstractOrcColumnVector

private final LongColumnVector vector;

public OrcLongColumnVector(LongColumnVector vector) {
super(vector);
public OrcLongColumnVector(LongColumnVector vector, VectorizedRowBatch orcBatch) {
super(vector, orcBatch);
this.vector = vector;
}

@Override
public long getLong(int i) {
return vector.vector[vector.isRepeating ? 0 : i];
i = rowMapper(i);
return vector.vector[i];
}

@Override
public boolean getBoolean(int i) {
return vector.vector[vector.isRepeating ? 0 : i] == 1;
i = rowMapper(i);
return vector.vector[i] == 1;
}

@Override
public byte getByte(int i) {
return (byte) vector.vector[vector.isRepeating ? 0 : i];
i = rowMapper(i);
return (byte) vector.vector[i];
}

@Override
public int getInt(int i) {
return (int) vector.vector[vector.isRepeating ? 0 : i];
i = rowMapper(i);
return (int) vector.vector[i];
}

@Override
public short getShort(int i) {
return (short) vector.vector[vector.isRepeating ? 0 : i];
i = rowMapper(i);
return (short) vector.vector[i];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.types.MapType;

import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

/** This column vector is used to adapt hive's MapColumnVector to Paimon's MapColumnVector. */
public class OrcMapColumnVector extends AbstractOrcColumnVector
Expand All @@ -33,15 +34,18 @@ public class OrcMapColumnVector extends AbstractOrcColumnVector
private final ColumnVector keyPaimonVector;
private final ColumnVector valuePaimonVector;

public OrcMapColumnVector(MapColumnVector hiveVector, MapType type) {
super(hiveVector);
public OrcMapColumnVector(
MapColumnVector hiveVector, VectorizedRowBatch orcBatch, MapType type) {
super(hiveVector, orcBatch);
this.hiveVector = hiveVector;
this.keyPaimonVector = createPaimonVector(hiveVector.keys, type.getKeyType());
this.valuePaimonVector = createPaimonVector(hiveVector.values, type.getValueType());
this.keyPaimonVector = createPaimonVector(hiveVector.keys, orcBatch, type.getKeyType());
this.valuePaimonVector =
createPaimonVector(hiveVector.values, orcBatch, type.getValueType());
}

@Override
public InternalMap getMap(int i) {
i = rowMapper(i);
long offset = hiveVector.offsets[i];
long length = hiveVector.lengths[i];
return new ColumnarMap(keyPaimonVector, valuePaimonVector, (int) offset, (int) length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,29 @@
import org.apache.paimon.types.RowType;

import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

/** This column vector is used to adapt hive's StructColumnVector to Flink's RowColumnVector. */
public class OrcRowColumnVector extends AbstractOrcColumnVector
implements org.apache.paimon.data.columnar.RowColumnVector {

private final VectorizedColumnBatch batch;

public OrcRowColumnVector(StructColumnVector hiveVector, RowType type) {
super(hiveVector);
public OrcRowColumnVector(
StructColumnVector hiveVector, VectorizedRowBatch orcBatch, RowType type) {
super(hiveVector, orcBatch);
int len = hiveVector.fields.length;
ColumnVector[] paimonVectors = new ColumnVector[len];
for (int i = 0; i < len; i++) {
paimonVectors[i] = createPaimonVector(hiveVector.fields[i], type.getTypeAt(i));
paimonVectors[i] =
createPaimonVector(hiveVector.fields[i], orcBatch, type.getTypeAt(i));
}
this.batch = new VectorizedColumnBatch(paimonVectors);
}

@Override
public ColumnarRow getRow(int i) {
i = rowMapper(i);
return new ColumnarRow(batch, i);
}

Expand Down
Loading