From c58cc8bb8ccafb9007b7b7e3399602c14de5698f Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Wed, 6 Oct 2021 15:35:04 +0200 Subject: [PATCH 01/11] refactor: conforming OrcValueWriter signatures --- .../spark/data/SparkOrcValueWriter.java | 10 +- .../spark/data/SparkOrcValueWriters.java | 117 +++++++++-------- .../iceberg/spark/data/SparkOrcWriter.java | 120 +++++++++++++++--- 3 files changed, 172 insertions(+), 75 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java index b4124468687f..b326f689bbc3 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java @@ -24,7 +24,7 @@ import org.apache.orc.storage.ql.exec.vector.ColumnVector; import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; -interface SparkOrcValueWriter { +interface SparkOrcValueWriter { /** * Take a value from the data and add it to the ORC output. @@ -34,17 +34,17 @@ interface SparkOrcValueWriter { * @param data the data value to write. * @param output the ColumnVector to put the value into. */ - default void write(int rowId, int column, SpecializedGetters data, ColumnVector output) { - if (data.isNullAt(column)) { + default void write(int rowId, T data, ColumnVector output) { + if (data == null) { output.noNulls = false; output.isNull[rowId] = true; } else { output.isNull[rowId] = false; - nonNullWrite(rowId, column, data, output); + nonNullWrite(rowId, data, output); } } - void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output); + void nonNullWrite(int rowId, T data, ColumnVector output); /** * Returns a stream of {@link FieldMetrics} that this SparkOrcValueWriter keeps track of. diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index df1b079bc7fa..14eda2403a83 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -19,10 +19,12 @@ package org.apache.iceberg.spark.data; +import java.util.List; import java.util.stream.Stream; import org.apache.iceberg.DoubleFieldMetrics; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.FloatFieldMetrics; +import org.apache.orc.TypeDescription; import org.apache.orc.storage.common.type.HiveDecimal; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.ColumnVector; @@ -32,9 +34,12 @@ import org.apache.orc.storage.ql.exec.vector.LongColumnVector; import org.apache.orc.storage.ql.exec.vector.MapColumnVector; import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.UTF8String; class SparkOrcValueWriters { private SparkOrcValueWriters() { @@ -88,60 +93,61 @@ static SparkOrcValueWriter decimal(int precision, int scale) { } } - static SparkOrcValueWriter list(SparkOrcValueWriter element) { - return new ListWriter(element); + static SparkOrcValueWriter list(SparkOrcValueWriter element, List orcType) { + return new ListWriter(element, orcType); } - static SparkOrcValueWriter map(SparkOrcValueWriter keyWriter, SparkOrcValueWriter valueWriter) { - return new MapWriter(keyWriter, valueWriter); + static SparkOrcValueWriter map(SparkOrcValueWriter keyWriter, SparkOrcValueWriter valueWriter, + List orcType) { + return new MapWriter(keyWriter, valueWriter, orcType); } - private static class BooleanWriter implements SparkOrcValueWriter { + private static class BooleanWriter implements SparkOrcValueWriter { private static final BooleanWriter INSTANCE = new BooleanWriter(); @Override - public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { - ((LongColumnVector) output).vector[rowId] = data.getBoolean(column) ? 1 : 0; + public void nonNullWrite(int rowId, Boolean data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = data ? 1 : 0; } } - private static class ByteWriter implements SparkOrcValueWriter { + private static class ByteWriter implements SparkOrcValueWriter { private static final ByteWriter INSTANCE = new ByteWriter(); @Override - public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { - ((LongColumnVector) output).vector[rowId] = data.getByte(column); + public void nonNullWrite(int rowId, Byte data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = data; } } - private static class ShortWriter implements SparkOrcValueWriter { + private static class ShortWriter implements SparkOrcValueWriter { private static final ShortWriter INSTANCE = new ShortWriter(); @Override - public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { - ((LongColumnVector) output).vector[rowId] = data.getShort(column); + public void nonNullWrite(int rowId, Short data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = data; } } - private static class IntWriter implements SparkOrcValueWriter { + private static class IntWriter implements SparkOrcValueWriter { private static final IntWriter INSTANCE = new IntWriter(); @Override - public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { - ((LongColumnVector) output).vector[rowId] = data.getInt(column); + public void nonNullWrite(int rowId, Integer data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = data; } } - private static class LongWriter implements SparkOrcValueWriter { + private static class LongWriter implements SparkOrcValueWriter { private static final LongWriter INSTANCE = new LongWriter(); @Override - public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { - ((LongColumnVector) output).vector[rowId] = data.getLong(column); + public void nonNullWrite(int rowId, Long data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = data; } } - private static class FloatWriter implements SparkOrcValueWriter { + private static class FloatWriter implements SparkOrcValueWriter { private final FloatFieldMetrics.Builder floatFieldMetricsBuilder; private FloatWriter(int id) { @@ -149,8 +155,7 @@ private FloatWriter(int id) { } @Override - public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { - float floatValue = data.getFloat(column); + public void nonNullWrite(int rowId, Float floatValue, ColumnVector output) { ((DoubleColumnVector) output).vector[rowId] = floatValue; floatFieldMetricsBuilder.addValue(floatValue); } @@ -161,7 +166,7 @@ public Stream> metrics() { } } - private static class DoubleWriter implements SparkOrcValueWriter { + private static class DoubleWriter implements SparkOrcValueWriter { private final DoubleFieldMetrics.Builder doubleFieldMetricsBuilder; private DoubleWriter(int id) { @@ -169,8 +174,7 @@ private DoubleWriter(int id) { } @Override - public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { - double doubleValue = data.getDouble(column); + public void nonNullWrite(int rowId, Double doubleValue, ColumnVector output) { ((DoubleColumnVector) output).vector[rowId] = doubleValue; doubleFieldMetricsBuilder.addValue(doubleValue); } @@ -181,41 +185,37 @@ public Stream> metrics() { } } - private static class BytesWriter implements SparkOrcValueWriter { + private static class BytesWriter implements SparkOrcValueWriter { private static final BytesWriter INSTANCE = new BytesWriter(); @Override - public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { - // getBinary always makes a copy, so we don't need to worry about it - // being changed behind our back. - byte[] value = data.getBinary(column); + public void nonNullWrite(int rowId, byte[] value, ColumnVector output) { ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); } } - private static class StringWriter implements SparkOrcValueWriter { + private static class StringWriter implements SparkOrcValueWriter { private static final StringWriter INSTANCE = new StringWriter(); @Override - public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { - byte[] value = data.getUTF8String(column).getBytes(); + public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) { + byte[] value = data.getBytes(); ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); } } - private static class TimestampTzWriter implements SparkOrcValueWriter { + private static class TimestampTzWriter implements SparkOrcValueWriter { private static final TimestampTzWriter INSTANCE = new TimestampTzWriter(); @Override - public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { + public void nonNullWrite(int rowId, Long micros, ColumnVector output) { TimestampColumnVector cv = (TimestampColumnVector) output; - long micros = data.getLong(column); // it could be negative. cv.time[rowId] = Math.floorDiv(micros, 1_000); // millis cv.nanos[rowId] = (int) Math.floorMod(micros, 1_000_000) * 1_000; // nanos } } - private static class Decimal18Writer implements SparkOrcValueWriter { + private static class Decimal18Writer implements SparkOrcValueWriter { private final int precision; private final int scale; @@ -225,13 +225,13 @@ private static class Decimal18Writer implements SparkOrcValueWriter { } @Override - public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { + public void nonNullWrite(int rowId, Decimal decimal, ColumnVector output) { ((DecimalColumnVector) output).vector[rowId].setFromLongAndScale( - data.getDecimal(column, precision, scale).toUnscaledLong(), scale); + decimal.toUnscaledLong(), scale); } } - private static class Decimal38Writer implements SparkOrcValueWriter { + private static class Decimal38Writer implements SparkOrcValueWriter { private final int precision; private final int scale; @@ -241,23 +241,26 @@ private static class Decimal38Writer implements SparkOrcValueWriter { } @Override - public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { + public void nonNullWrite(int rowId, Decimal decimal, ColumnVector output) { ((DecimalColumnVector) output).vector[rowId].set( - HiveDecimal.create(data.getDecimal(column, precision, scale) - .toJavaBigDecimal())); + HiveDecimal.create(decimal.toJavaBigDecimal())); } } - private static class ListWriter implements SparkOrcValueWriter { + private static class ListWriter implements SparkOrcValueWriter { private final SparkOrcValueWriter writer; + private final SparkOrcWriter.FieldGetter fieldGetter; - ListWriter(SparkOrcValueWriter writer) { + ListWriter(SparkOrcValueWriter writer, List orcTypes) { + if (orcTypes.size() != 1) { + throw new IllegalArgumentException("Expected one (and same) ORC type for list elements, got: " + orcTypes); + } this.writer = writer; + this.fieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(0), 0, 1); } @Override - public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { - ArrayData value = data.getArray(column); + public void nonNullWrite(int rowId, ArrayData value, ColumnVector output) { ListColumnVector cv = (ListColumnVector) output; // record the length and start of the list elements cv.lengths[rowId] = value.numElements(); @@ -267,7 +270,7 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV growColumnVector(cv.child, cv.childCount); // Add each element for (int e = 0; e < cv.lengths[rowId]; ++e) { - writer.write((int) (e + cv.offsets[rowId]), e, value, cv.child); + writer.write((int) (e + cv.offsets[rowId]), fieldGetter.getFieldOrNull(value, e), cv.child); } } @@ -277,18 +280,24 @@ public Stream> metrics() { } } - private static class MapWriter implements SparkOrcValueWriter { + private static class MapWriter implements SparkOrcValueWriter { private final SparkOrcValueWriter keyWriter; private final SparkOrcValueWriter valueWriter; + private final SparkOrcWriter.FieldGetter keyFieldGetter; + private final SparkOrcWriter.FieldGetter valueFieldGetter; - MapWriter(SparkOrcValueWriter keyWriter, SparkOrcValueWriter valueWriter) { + MapWriter(SparkOrcValueWriter keyWriter, SparkOrcValueWriter valueWriter, List orcTypes) { + if (orcTypes.size() != 2) { + throw new IllegalArgumentException("Expected two ORC type descriptions for a map, got: " + orcTypes); + } this.keyWriter = keyWriter; this.valueWriter = valueWriter; + this.keyFieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(0), 0, 1); + this.valueFieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(1), 0, 1); } @Override - public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { - MapData map = data.getMap(column); + public void nonNullWrite(int rowId, MapData map, ColumnVector output) { ArrayData key = map.keyArray(); ArrayData value = map.valueArray(); MapColumnVector cv = (MapColumnVector) output; @@ -302,8 +311,8 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV // Add each element for (int e = 0; e < cv.lengths[rowId]; ++e) { int pos = (int) (e + cv.offsets[rowId]); - keyWriter.write(pos, e, key, cv.keys); - valueWriter.write(pos, e, value, cv.values); + keyWriter.write(pos, keyFieldGetter.getFieldOrNull(key, e), cv.keys); + valueWriter.write(pos, valueFieldGetter.getFieldOrNull(value, e), cv.values); } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index 2c1edea1ffef..25ee43f36c15 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -19,14 +19,19 @@ package org.apache.iceberg.spark.data; +import java.io.Serializable; import java.util.List; +import java.util.function.Function; import java.util.stream.Stream; +import javax.annotation.Nullable; + import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Schema; import org.apache.iceberg.orc.ORCSchemaUtil; import org.apache.iceberg.orc.OrcRowWriter; import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; @@ -54,14 +59,12 @@ public SparkOrcWriter(Schema iSchema, TypeDescription orcSchema) { @Override public void write(InternalRow value, VectorizedRowBatch output) { Preconditions.checkArgument(writer instanceof StructWriter, "writer must be StructWriter"); + Preconditions.checkArgument(value != null, "value must not be null"); int row = output.size; output.size += 1; - List writers = ((StructWriter) writer).writers(); - for (int c = 0; c < writers.size(); c++) { - SparkOrcValueWriter child = writers.get(c); - child.write(row, c, value, output.cols[c]); - } + + ((StructWriter) writer).rootNonNullWrite(row, value, output); } @Override @@ -76,19 +79,19 @@ private WriteBuilder() { @Override public SparkOrcValueWriter record(Types.StructType iStruct, TypeDescription record, List names, List fields) { - return new StructWriter(fields); + return new StructWriter(fields, record.getChildren()); } @Override public SparkOrcValueWriter list(Types.ListType iList, TypeDescription array, SparkOrcValueWriter element) { - return SparkOrcValueWriters.list(element); + return SparkOrcValueWriters.list(element, array.getChildren()); } @Override public SparkOrcValueWriter map(Types.MapType iMap, TypeDescription map, SparkOrcValueWriter key, SparkOrcValueWriter value) { - return SparkOrcValueWriters.map(key, value); + return SparkOrcValueWriters.map(key, value, map.getChildren()); } @Override @@ -126,11 +129,16 @@ public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescript } } - private static class StructWriter implements SparkOrcValueWriter { + private static class StructWriter implements SparkOrcValueWriter { private final List writers; + private final List fieldGetters; - StructWriter(List writers) { + StructWriter(List writers, List orcTypes) { this.writers = writers; + this.fieldGetters = Lists.newArrayListWithExpectedSize(orcTypes.size()); + for (int i = 0; i < orcTypes.size(); i++) { + fieldGetters.add(createFieldGetter(orcTypes.get(i), i, orcTypes.size())); + } } List writers() { @@ -138,18 +146,98 @@ List writers() { } @Override - public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { - InternalRow value = data.getStruct(column, writers.size()); + public void nonNullWrite(int rowId, InternalRow value, ColumnVector output) { StructColumnVector cv = (StructColumnVector) output; + write(rowId, value, c -> cv.fields[c]); + } + + // Special case of writing the root struct + public void rootNonNullWrite(int rowId, InternalRow value, VectorizedRowBatch output) { + write(rowId, value, c -> output.cols[c]); + } + + private void write(int rowId, InternalRow value, Function colVectorAtFunc) { for (int c = 0; c < writers.size(); ++c) { - writers.get(c).write(rowId, c, value, cv.fields[c]); + writers.get(c).write(rowId, fieldGetters.get(c).getFieldOrNull(value), colVectorAtFunc.apply(c)); } } - @Override - public Stream> metrics() { - return writers.stream().flatMap(SparkOrcValueWriter::metrics); + } + + static FieldGetter createFieldGetter(TypeDescription fieldType, int fieldPos, int fieldCount) { + final FieldGetter fieldGetter; + switch (fieldType.getCategory()) { + case BOOLEAN: + fieldGetter = (row, offset) -> row.getBoolean(fieldPos + offset); + break; + case BYTE: + fieldGetter = (row, offset) -> row.getByte(fieldPos + offset); + break; + case SHORT: + fieldGetter = (row, offset) -> row.getShort(fieldPos + offset); + break; + case DATE: + case INT: + fieldGetter = (row, offset) -> row.getInt(fieldPos + offset); + break; + case LONG: + case TIMESTAMP: + case TIMESTAMP_INSTANT: + fieldGetter = (row, offset) -> row.getLong(fieldPos + offset); + break; + case FLOAT: + fieldGetter = (row, offset) -> row.getFloat(fieldPos + offset); + break; + case DOUBLE: + fieldGetter = (row, offset) -> row.getDouble(fieldPos + offset); + break; + case BINARY: + fieldGetter = (row, offset) -> row.getBinary(fieldPos + offset); + // getBinary always makes a copy, so we don't need to worry about it + // being changed behind our back. + break; + case DECIMAL: + fieldGetter = (row, offset) -> row.getDecimal(fieldPos + offset, fieldType.getPrecision(), fieldType.getScale()); + break; + case STRING: + case CHAR: + case VARCHAR: + fieldGetter = (row, offset) -> row.getUTF8String(fieldPos + offset); + break; + case STRUCT: + fieldGetter = (row, offset) -> row.getStruct(fieldPos + offset, fieldCount); + break; + case LIST: + fieldGetter = (row, offset) -> row.getArray(fieldPos + offset); + break; + case MAP: + fieldGetter = (row, offset) -> row.getMap(fieldPos + offset); + break; + default: + throw new IllegalArgumentException(); + } + + return (row, offset) -> { + if (row.isNullAt(fieldPos + offset)) { + return null; + } + return fieldGetter.getFieldOrNull(row, offset); + }; + } + + interface FieldGetter extends Serializable { + @Nullable + default Object getFieldOrNull(SpecializedGetters row) { + return getFieldOrNull(row, 0); } + /** + * + * @param row Spark's data representation + * @param offset added to ordinal before being passed to respective getter method - used for complex types only + * @return + */ + @Nullable + Object getFieldOrNull(SpecializedGetters row, int offset); } } From 8294fec124e69fd298d816614e22abe92153e8e4 Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Wed, 6 Oct 2021 15:42:22 +0200 Subject: [PATCH 02/11] refactor: removing SparkOrcValueWriter --- .../spark/data/SparkOrcValueWriter.java | 59 -------- .../spark/data/SparkOrcValueWriters.java | 134 +++++++++++++----- .../iceberg/spark/data/SparkOrcWriter.java | 31 ++-- 3 files changed, 120 insertions(+), 104 deletions(-) delete mode 100644 spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java deleted file mode 100644 index b326f689bbc3..000000000000 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.spark.data; - -import java.util.stream.Stream; -import org.apache.iceberg.FieldMetrics; -import org.apache.orc.storage.ql.exec.vector.ColumnVector; -import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; - -interface SparkOrcValueWriter { - - /** - * Take a value from the data and add it to the ORC output. - * - * @param rowId the row id in the ColumnVector. - * @param column the column number. - * @param data the data value to write. - * @param output the ColumnVector to put the value into. - */ - default void write(int rowId, T data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - nonNullWrite(rowId, data, output); - } - } - - void nonNullWrite(int rowId, T data, ColumnVector output); - - /** - * Returns a stream of {@link FieldMetrics} that this SparkOrcValueWriter keeps track of. - *

- * Since ORC keeps track of most metrics via column statistics, for now SparkOrcValueWriter only keeps track of NaN - * counters, and only return non-empty stream if the writer writes double or float values either by itself or - * transitively. - */ - default Stream> metrics() { - return Stream.empty(); - } -} diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index 14eda2403a83..8d22984331c1 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -24,6 +24,7 @@ import org.apache.iceberg.DoubleFieldMetrics; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.FloatFieldMetrics; +import org.apache.iceberg.orc.OrcValueWriter; import org.apache.orc.TypeDescription; import org.apache.orc.storage.common.type.HiveDecimal; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; @@ -45,47 +46,47 @@ class SparkOrcValueWriters { private SparkOrcValueWriters() { } - static SparkOrcValueWriter booleans() { + static OrcValueWriter booleans() { return BooleanWriter.INSTANCE; } - static SparkOrcValueWriter bytes() { + static OrcValueWriter bytes() { return ByteWriter.INSTANCE; } - static SparkOrcValueWriter shorts() { + static OrcValueWriter shorts() { return ShortWriter.INSTANCE; } - static SparkOrcValueWriter ints() { + static OrcValueWriter ints() { return IntWriter.INSTANCE; } - static SparkOrcValueWriter longs() { + static OrcValueWriter longs() { return LongWriter.INSTANCE; } - static SparkOrcValueWriter floats(int id) { + static OrcValueWriter floats(int id) { return new FloatWriter(id); } - static SparkOrcValueWriter doubles(int id) { + static OrcValueWriter doubles(int id) { return new DoubleWriter(id); } - static SparkOrcValueWriter byteArrays() { + static OrcValueWriter byteArrays() { return BytesWriter.INSTANCE; } - static SparkOrcValueWriter strings() { + static OrcValueWriter strings() { return StringWriter.INSTANCE; } - static SparkOrcValueWriter timestampTz() { + static OrcValueWriter timestampTz() { return TimestampTzWriter.INSTANCE; } - static SparkOrcValueWriter decimal(int precision, int scale) { + static OrcValueWriter decimal(int precision, int scale) { if (precision <= 18) { return new Decimal18Writer(precision, scale); } else { @@ -93,61 +94,86 @@ static SparkOrcValueWriter decimal(int precision, int scale) { } } - static SparkOrcValueWriter list(SparkOrcValueWriter element, List orcType) { + static OrcValueWriter list(OrcValueWriter element, List orcType) { return new ListWriter(element, orcType); } - static SparkOrcValueWriter map(SparkOrcValueWriter keyWriter, SparkOrcValueWriter valueWriter, + static OrcValueWriter map(OrcValueWriter keyWriter, OrcValueWriter valueWriter, List orcType) { return new MapWriter(keyWriter, valueWriter, orcType); } - private static class BooleanWriter implements SparkOrcValueWriter { + private static class BooleanWriter implements OrcValueWriter { private static final BooleanWriter INSTANCE = new BooleanWriter(); @Override public void nonNullWrite(int rowId, Boolean data, ColumnVector output) { ((LongColumnVector) output).vector[rowId] = data ? 1 : 0; } + + @Override + public Class getJavaClass() { + return Boolean.class; + } } - private static class ByteWriter implements SparkOrcValueWriter { + private static class ByteWriter implements OrcValueWriter { private static final ByteWriter INSTANCE = new ByteWriter(); @Override public void nonNullWrite(int rowId, Byte data, ColumnVector output) { ((LongColumnVector) output).vector[rowId] = data; } + + @Override + public Class getJavaClass() { + return Byte.class; + } } - private static class ShortWriter implements SparkOrcValueWriter { + private static class ShortWriter implements OrcValueWriter { private static final ShortWriter INSTANCE = new ShortWriter(); @Override public void nonNullWrite(int rowId, Short data, ColumnVector output) { ((LongColumnVector) output).vector[rowId] = data; } + + @Override + public Class getJavaClass() { + return Short.class; + } } - private static class IntWriter implements SparkOrcValueWriter { + private static class IntWriter implements OrcValueWriter { private static final IntWriter INSTANCE = new IntWriter(); @Override public void nonNullWrite(int rowId, Integer data, ColumnVector output) { ((LongColumnVector) output).vector[rowId] = data; } + + @Override + public Class getJavaClass() { + return Integer.class; + } } - private static class LongWriter implements SparkOrcValueWriter { + private static class LongWriter implements OrcValueWriter { private static final LongWriter INSTANCE = new LongWriter(); @Override public void nonNullWrite(int rowId, Long data, ColumnVector output) { ((LongColumnVector) output).vector[rowId] = data; } + + @Override + public Class getJavaClass() { + return Long.class; + } } - private static class FloatWriter implements SparkOrcValueWriter { + private static class FloatWriter implements OrcValueWriter { private final FloatFieldMetrics.Builder floatFieldMetricsBuilder; private FloatWriter(int id) { @@ -164,9 +190,13 @@ public void nonNullWrite(int rowId, Float floatValue, ColumnVector output) { public Stream> metrics() { return Stream.of(floatFieldMetricsBuilder.build()); } + @Override + public Class getJavaClass() { + return Float.class; + } } - private static class DoubleWriter implements SparkOrcValueWriter { + private static class DoubleWriter implements OrcValueWriter { private final DoubleFieldMetrics.Builder doubleFieldMetricsBuilder; private DoubleWriter(int id) { @@ -183,18 +213,28 @@ public void nonNullWrite(int rowId, Double doubleValue, ColumnVector output) { public Stream> metrics() { return Stream.of(doubleFieldMetricsBuilder.build()); } + + @Override + public Class getJavaClass() { + return Double.class; + } } - private static class BytesWriter implements SparkOrcValueWriter { + private static class BytesWriter implements OrcValueWriter { private static final BytesWriter INSTANCE = new BytesWriter(); @Override public void nonNullWrite(int rowId, byte[] value, ColumnVector output) { ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); } + + @Override + public Class getJavaClass() { + return byte[].class; + } } - private static class StringWriter implements SparkOrcValueWriter { + private static class StringWriter implements OrcValueWriter { private static final StringWriter INSTANCE = new StringWriter(); @Override @@ -202,9 +242,14 @@ public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) { byte[] value = data.getBytes(); ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); } + + @Override + public Class getJavaClass() { + return UTF8String.class; + } } - private static class TimestampTzWriter implements SparkOrcValueWriter { + private static class TimestampTzWriter implements OrcValueWriter { private static final TimestampTzWriter INSTANCE = new TimestampTzWriter(); @Override @@ -213,9 +258,14 @@ public void nonNullWrite(int rowId, Long micros, ColumnVector output) { cv.time[rowId] = Math.floorDiv(micros, 1_000); // millis cv.nanos[rowId] = (int) Math.floorMod(micros, 1_000_000) * 1_000; // nanos } + + @Override + public Class getJavaClass() { + return Long.class; + } } - private static class Decimal18Writer implements SparkOrcValueWriter { + private static class Decimal18Writer implements OrcValueWriter { private final int precision; private final int scale; @@ -229,9 +279,14 @@ public void nonNullWrite(int rowId, Decimal decimal, ColumnVector output) { ((DecimalColumnVector) output).vector[rowId].setFromLongAndScale( decimal.toUnscaledLong(), scale); } + + @Override + public Class getJavaClass() { + return Decimal.class; + } } - private static class Decimal38Writer implements SparkOrcValueWriter { + private static class Decimal38Writer implements OrcValueWriter { private final int precision; private final int scale; @@ -245,13 +300,18 @@ public void nonNullWrite(int rowId, Decimal decimal, ColumnVector output) { ((DecimalColumnVector) output).vector[rowId].set( HiveDecimal.create(decimal.toJavaBigDecimal())); } + + @Override + public Class getJavaClass() { + return Decimal.class; + } } - private static class ListWriter implements SparkOrcValueWriter { - private final SparkOrcValueWriter writer; + private static class ListWriter implements OrcValueWriter { + private final OrcValueWriter writer; private final SparkOrcWriter.FieldGetter fieldGetter; - ListWriter(SparkOrcValueWriter writer, List orcTypes) { + ListWriter(OrcValueWriter writer, List orcTypes) { if (orcTypes.size() != 1) { throw new IllegalArgumentException("Expected one (and same) ORC type for list elements, got: " + orcTypes); } @@ -278,15 +338,20 @@ public void nonNullWrite(int rowId, ArrayData value, ColumnVector output) { public Stream> metrics() { return writer.metrics(); } + + @Override + public Class getJavaClass() { + return ArrayData.class; + } } - private static class MapWriter implements SparkOrcValueWriter { - private final SparkOrcValueWriter keyWriter; - private final SparkOrcValueWriter valueWriter; + private static class MapWriter implements OrcValueWriter { + private final OrcValueWriter keyWriter; + private final OrcValueWriter valueWriter; private final SparkOrcWriter.FieldGetter keyFieldGetter; private final SparkOrcWriter.FieldGetter valueFieldGetter; - MapWriter(SparkOrcValueWriter keyWriter, SparkOrcValueWriter valueWriter, List orcTypes) { + MapWriter(OrcValueWriter keyWriter, OrcValueWriter valueWriter, List orcTypes) { if (orcTypes.size() != 2) { throw new IllegalArgumentException("Expected two ORC type descriptions for a map, got: " + orcTypes); } @@ -320,6 +385,11 @@ public void nonNullWrite(int rowId, MapData map, ColumnVector output) { public Stream> metrics() { return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); } + + @Override + public Class getJavaClass() { + return MapData.class; + } } private static void growColumnVector(ColumnVector cv, int requestedSize) { diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index 25ee43f36c15..e64f7ca2766a 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -30,6 +30,7 @@ import org.apache.iceberg.orc.ORCSchemaUtil; import org.apache.iceberg.orc.OrcRowWriter; import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; +import org.apache.iceberg.orc.OrcValueWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; @@ -47,7 +48,7 @@ */ public class SparkOrcWriter implements OrcRowWriter { - private final SparkOrcValueWriter writer; + private final OrcValueWriter writer; public SparkOrcWriter(Schema iSchema, TypeDescription orcSchema) { Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT, @@ -72,30 +73,30 @@ public Stream> metrics() { return writer.metrics(); } - private static class WriteBuilder extends OrcSchemaWithTypeVisitor { + private static class WriteBuilder extends OrcSchemaWithTypeVisitor { private WriteBuilder() { } @Override - public SparkOrcValueWriter record(Types.StructType iStruct, TypeDescription record, - List names, List fields) { + public OrcValueWriter record(Types.StructType iStruct, TypeDescription record, + List names, List fields) { return new StructWriter(fields, record.getChildren()); } @Override - public SparkOrcValueWriter list(Types.ListType iList, TypeDescription array, - SparkOrcValueWriter element) { + public OrcValueWriter list(Types.ListType iList, TypeDescription array, + OrcValueWriter element) { return SparkOrcValueWriters.list(element, array.getChildren()); } @Override - public SparkOrcValueWriter map(Types.MapType iMap, TypeDescription map, - SparkOrcValueWriter key, SparkOrcValueWriter value) { + public OrcValueWriter map(Types.MapType iMap, TypeDescription map, + OrcValueWriter key, OrcValueWriter value) { return SparkOrcValueWriters.map(key, value, map.getChildren()); } @Override - public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { switch (primitive.getCategory()) { case BOOLEAN: return SparkOrcValueWriters.booleans(); @@ -129,11 +130,11 @@ public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescript } } - private static class StructWriter implements SparkOrcValueWriter { - private final List writers; + private static class StructWriter implements OrcValueWriter { + private final List writers; private final List fieldGetters; - StructWriter(List writers, List orcTypes) { + StructWriter(List writers, List orcTypes) { this.writers = writers; this.fieldGetters = Lists.newArrayListWithExpectedSize(orcTypes.size()); for (int i = 0; i < orcTypes.size(); i++) { @@ -141,7 +142,7 @@ private static class StructWriter implements SparkOrcValueWriter { } } - List writers() { + List writers() { return writers; } @@ -162,6 +163,10 @@ private void write(int rowId, InternalRow value, Function } } + @Override + public Class getJavaClass() { + return InternalRow.class; + } } static FieldGetter createFieldGetter(TypeDescription fieldType, int fieldPos, int fieldCount) { From d91cd0a898639ce60f00b1f60929dad16f96d13e Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Wed, 6 Oct 2021 16:01:33 +0200 Subject: [PATCH 03/11] refactor: fixing decimal params, fixing raw usages of generic OrcValueWriter type --- .../spark/data/SparkOrcValueWriters.java | 45 ++++++++----------- .../iceberg/spark/data/SparkOrcWriter.java | 40 ++++++++--------- 2 files changed, 37 insertions(+), 48 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index 8d22984331c1..ccb08384a69b 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -46,59 +46,59 @@ class SparkOrcValueWriters { private SparkOrcValueWriters() { } - static OrcValueWriter booleans() { + static OrcValueWriter booleans() { return BooleanWriter.INSTANCE; } - static OrcValueWriter bytes() { + static OrcValueWriter bytes() { return ByteWriter.INSTANCE; } - static OrcValueWriter shorts() { + static OrcValueWriter shorts() { return ShortWriter.INSTANCE; } - static OrcValueWriter ints() { + static OrcValueWriter ints() { return IntWriter.INSTANCE; } - static OrcValueWriter longs() { + static OrcValueWriter longs() { return LongWriter.INSTANCE; } - static OrcValueWriter floats(int id) { + static OrcValueWriter floats(int id) { return new FloatWriter(id); } - static OrcValueWriter doubles(int id) { + static OrcValueWriter doubles(int id) { return new DoubleWriter(id); } - static OrcValueWriter byteArrays() { + static OrcValueWriter byteArrays() { return BytesWriter.INSTANCE; } - static OrcValueWriter strings() { + static OrcValueWriter strings() { return StringWriter.INSTANCE; } - static OrcValueWriter timestampTz() { + static OrcValueWriter timestampTz() { return TimestampTzWriter.INSTANCE; } - static OrcValueWriter decimal(int precision, int scale) { + static OrcValueWriter decimal(int precision, int scale) { if (precision <= 18) { - return new Decimal18Writer(precision, scale); + return new Decimal18Writer(scale); } else { - return new Decimal38Writer(precision, scale); + return new Decimal38Writer(); } } - static OrcValueWriter list(OrcValueWriter element, List orcType) { + static OrcValueWriter list(OrcValueWriter element, List orcType) { return new ListWriter(element, orcType); } - static OrcValueWriter map(OrcValueWriter keyWriter, OrcValueWriter valueWriter, + static OrcValueWriter map(OrcValueWriter keyWriter, OrcValueWriter valueWriter, List orcType) { return new MapWriter(keyWriter, valueWriter, orcType); } @@ -266,11 +266,9 @@ public Class getJavaClass() { } private static class Decimal18Writer implements OrcValueWriter { - private final int precision; private final int scale; - Decimal18Writer(int precision, int scale) { - this.precision = precision; + Decimal18Writer(int scale) { this.scale = scale; } @@ -287,13 +285,6 @@ public Class getJavaClass() { } private static class Decimal38Writer implements OrcValueWriter { - private final int precision; - private final int scale; - - Decimal38Writer(int precision, int scale) { - this.precision = precision; - this.scale = scale; - } @Override public void nonNullWrite(int rowId, Decimal decimal, ColumnVector output) { @@ -311,7 +302,7 @@ private static class ListWriter implements OrcValueWriter { private final OrcValueWriter writer; private final SparkOrcWriter.FieldGetter fieldGetter; - ListWriter(OrcValueWriter writer, List orcTypes) { + ListWriter(OrcValueWriter writer, List orcTypes) { if (orcTypes.size() != 1) { throw new IllegalArgumentException("Expected one (and same) ORC type for list elements, got: " + orcTypes); } @@ -351,7 +342,7 @@ private static class MapWriter implements OrcValueWriter { private final SparkOrcWriter.FieldGetter keyFieldGetter; private final SparkOrcWriter.FieldGetter valueFieldGetter; - MapWriter(OrcValueWriter keyWriter, OrcValueWriter valueWriter, List orcTypes) { + MapWriter(OrcValueWriter keyWriter, OrcValueWriter valueWriter, List orcTypes) { if (orcTypes.size() != 2) { throw new IllegalArgumentException("Expected two ORC type descriptions for a map, got: " + orcTypes); } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index e64f7ca2766a..a1a0d1001027 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -48,7 +48,7 @@ */ public class SparkOrcWriter implements OrcRowWriter { - private final OrcValueWriter writer; + private final OrcValueWriter writer; public SparkOrcWriter(Schema iSchema, TypeDescription orcSchema) { Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT, @@ -73,30 +73,30 @@ public Stream> metrics() { return writer.metrics(); } - private static class WriteBuilder extends OrcSchemaWithTypeVisitor { + private static class WriteBuilder extends OrcSchemaWithTypeVisitor> { private WriteBuilder() { } @Override - public OrcValueWriter record(Types.StructType iStruct, TypeDescription record, - List names, List fields) { + public OrcValueWriter record(Types.StructType iStruct, TypeDescription record, + List names, List> fields) { return new StructWriter(fields, record.getChildren()); } @Override - public OrcValueWriter list(Types.ListType iList, TypeDescription array, - OrcValueWriter element) { + public OrcValueWriter list(Types.ListType iList, TypeDescription array, + OrcValueWriter element) { return SparkOrcValueWriters.list(element, array.getChildren()); } @Override - public OrcValueWriter map(Types.MapType iMap, TypeDescription map, - OrcValueWriter key, OrcValueWriter value) { + public OrcValueWriter map(Types.MapType iMap, TypeDescription map, + OrcValueWriter key, OrcValueWriter value) { return SparkOrcValueWriters.map(key, value, map.getChildren()); } @Override - public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { switch (primitive.getCategory()) { case BOOLEAN: return SparkOrcValueWriters.booleans(); @@ -132,20 +132,18 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription p private static class StructWriter implements OrcValueWriter { private final List writers; - private final List fieldGetters; + private final List> fieldGetters; - StructWriter(List writers, List orcTypes) { - this.writers = writers; + StructWriter(List> writers, List orcTypes) { + this.writers = Lists.newArrayListWithExpectedSize(writers.size()); this.fieldGetters = Lists.newArrayListWithExpectedSize(orcTypes.size()); + for (int i = 0; i < orcTypes.size(); i++) { + this.writers.add(writers.get(i)); fieldGetters.add(createFieldGetter(orcTypes.get(i), i, orcTypes.size())); } } - List writers() { - return writers; - } - @Override public void nonNullWrite(int rowId, InternalRow value, ColumnVector output) { StructColumnVector cv = (StructColumnVector) output; @@ -169,8 +167,8 @@ public Class getJavaClass() { } } - static FieldGetter createFieldGetter(TypeDescription fieldType, int fieldPos, int fieldCount) { - final FieldGetter fieldGetter; + static FieldGetter createFieldGetter(TypeDescription fieldType, int fieldPos, int fieldCount) { + final FieldGetter fieldGetter; switch (fieldType.getCategory()) { case BOOLEAN: fieldGetter = (row, offset) -> row.getBoolean(fieldPos + offset); @@ -230,9 +228,9 @@ static FieldGetter createFieldGetter(TypeDescription fieldType, int fieldPos, in }; } - interface FieldGetter extends Serializable { + interface FieldGetter extends Serializable { @Nullable - default Object getFieldOrNull(SpecializedGetters row) { + default T getFieldOrNull(SpecializedGetters row) { return getFieldOrNull(row, 0); } @@ -243,6 +241,6 @@ default Object getFieldOrNull(SpecializedGetters row) { * @return */ @Nullable - Object getFieldOrNull(SpecializedGetters row, int offset); + T getFieldOrNull(SpecializedGetters row, int offset); } } From f39396f0d4b6e07d524d38e9dd4b690d83000e0b Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Wed, 6 Oct 2021 16:05:47 +0200 Subject: [PATCH 04/11] refactor: checstyle fixes --- .../apache/iceberg/spark/data/SparkOrcValueWriters.java | 2 -- .../org/apache/iceberg/spark/data/SparkOrcWriter.java | 8 ++++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index ccb08384a69b..f6ec00a60651 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -35,8 +35,6 @@ import org.apache.orc.storage.ql.exec.vector.LongColumnVector; import org.apache.orc.storage.ql.exec.vector.MapColumnVector; import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.MapData; import org.apache.spark.sql.types.Decimal; diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index a1a0d1001027..6a322e3e80a6 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -24,7 +24,6 @@ import java.util.function.Function; import java.util.stream.Stream; import javax.annotation.Nullable; - import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Schema; import org.apache.iceberg.orc.ORCSchemaUtil; @@ -167,7 +166,7 @@ public Class getJavaClass() { } } - static FieldGetter createFieldGetter(TypeDescription fieldType, int fieldPos, int fieldCount) { + static FieldGetter createFieldGetter(TypeDescription fieldType, int fieldPos, int fieldCount) { final FieldGetter fieldGetter; switch (fieldType.getCategory()) { case BOOLEAN: @@ -200,7 +199,8 @@ static FieldGetter createFieldGetter(TypeDescription fieldType, int fieldPos, // being changed behind our back. break; case DECIMAL: - fieldGetter = (row, offset) -> row.getDecimal(fieldPos + offset, fieldType.getPrecision(), fieldType.getScale()); + fieldGetter = (row, offset) -> + row.getDecimal(fieldPos + offset, fieldType.getPrecision(), fieldType.getScale()); break; case STRING: case CHAR: @@ -238,7 +238,7 @@ default T getFieldOrNull(SpecializedGetters row) { * * @param row Spark's data representation * @param offset added to ordinal before being passed to respective getter method - used for complex types only - * @return + * @return field value at ordinal and offset */ @Nullable T getFieldOrNull(SpecializedGetters row, int offset); From f0ce178008eb710bbb0224c793abbd88e6ff42ee Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Wed, 6 Oct 2021 16:14:10 +0200 Subject: [PATCH 05/11] refactor: metrics fix --- .../java/org/apache/iceberg/spark/data/SparkOrcWriter.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index 6a322e3e80a6..de1059090be6 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -164,6 +164,12 @@ private void write(int rowId, InternalRow value, Function public Class getJavaClass() { return InternalRow.class; } + + @Override + public Stream> metrics() { + return writers.stream().flatMap(OrcValueWriter::metrics); + } + } static FieldGetter createFieldGetter(TypeDescription fieldType, int fieldPos, int fieldCount) { From e5499afaf5469d361c2deeeefd3e452b7142abb3 Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Thu, 7 Oct 2021 11:48:55 +0200 Subject: [PATCH 06/11] refactor: remove code duplicates of OrcValueWriters --- .../iceberg/data/orc/GenericOrcWriters.java | 16 ++- .../spark/data/SparkOrcValueWriters.java | 136 ++---------------- 2 files changed, 26 insertions(+), 126 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index 7efa1613de97..0eb6043455cb 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -219,11 +219,11 @@ public void nonNullWrite(int rowId, Long data, ColumnVector output) { } } - private static class FloatWriter implements OrcValueWriter { + public static class FloatWriter implements OrcValueWriter { private final FloatFieldMetrics.Builder floatFieldMetricsBuilder; private long nullValueCount = 0; - private FloatWriter(int id) { + protected FloatWriter(int id) { this.floatFieldMetricsBuilder = new FloatFieldMetrics.Builder(id); } @@ -251,13 +251,17 @@ public Stream> metrics() { nullValueCount, metricsWithoutNullCount.nanValueCount(), metricsWithoutNullCount.lowerBound(), metricsWithoutNullCount.upperBound())); } + + protected FloatFieldMetrics.Builder getFloatFieldMetricsBuilder() { + return floatFieldMetricsBuilder; + } } - private static class DoubleWriter implements OrcValueWriter { + public static class DoubleWriter implements OrcValueWriter { private final DoubleFieldMetrics.Builder doubleFieldMetricsBuilder; private long nullValueCount = 0; - private DoubleWriter(Integer id) { + protected DoubleWriter(Integer id) { this.doubleFieldMetricsBuilder = new DoubleFieldMetrics.Builder(id); } @@ -285,6 +289,10 @@ public Stream> metrics() { nullValueCount, metricsWithoutNullCount.nanValueCount(), metricsWithoutNullCount.lowerBound(), metricsWithoutNullCount.upperBound())); } + + public DoubleFieldMetrics.Builder getDoubleFieldMetricsBuilder() { + return doubleFieldMetricsBuilder; + } } private static class StringWriter implements OrcValueWriter { diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index f6ec00a60651..e231a1b41eae 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -21,18 +21,15 @@ import java.util.List; import java.util.stream.Stream; -import org.apache.iceberg.DoubleFieldMetrics; import org.apache.iceberg.FieldMetrics; -import org.apache.iceberg.FloatFieldMetrics; +import org.apache.iceberg.data.orc.GenericOrcWriters; import org.apache.iceberg.orc.OrcValueWriter; import org.apache.orc.TypeDescription; import org.apache.orc.storage.common.type.HiveDecimal; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.ColumnVector; import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; -import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; import org.apache.orc.storage.ql.exec.vector.ListColumnVector; -import org.apache.orc.storage.ql.exec.vector.LongColumnVector; import org.apache.orc.storage.ql.exec.vector.MapColumnVector; import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; import org.apache.spark.sql.catalyst.util.ArrayData; @@ -45,23 +42,23 @@ private SparkOrcValueWriters() { } static OrcValueWriter booleans() { - return BooleanWriter.INSTANCE; + return GenericOrcWriters.booleans(); } static OrcValueWriter bytes() { - return ByteWriter.INSTANCE; + return GenericOrcWriters.bytes(); } static OrcValueWriter shorts() { - return ShortWriter.INSTANCE; + return GenericOrcWriters.shorts(); } static OrcValueWriter ints() { - return IntWriter.INSTANCE; + return GenericOrcWriters.ints(); } static OrcValueWriter longs() { - return LongWriter.INSTANCE; + return GenericOrcWriters.longs(); } static OrcValueWriter floats(int id) { @@ -73,7 +70,7 @@ static OrcValueWriter doubles(int id) { } static OrcValueWriter byteArrays() { - return BytesWriter.INSTANCE; + return GenericOrcWriters.byteArrays(); } static OrcValueWriter strings() { @@ -101,135 +98,30 @@ static OrcValueWriter map(OrcValueWriter keyWriter, OrcValueWriter valu return new MapWriter(keyWriter, valueWriter, orcType); } - private static class BooleanWriter implements OrcValueWriter { - private static final BooleanWriter INSTANCE = new BooleanWriter(); - - @Override - public void nonNullWrite(int rowId, Boolean data, ColumnVector output) { - ((LongColumnVector) output).vector[rowId] = data ? 1 : 0; - } - - @Override - public Class getJavaClass() { - return Boolean.class; - } - } - - private static class ByteWriter implements OrcValueWriter { - private static final ByteWriter INSTANCE = new ByteWriter(); - - @Override - public void nonNullWrite(int rowId, Byte data, ColumnVector output) { - ((LongColumnVector) output).vector[rowId] = data; - } - - @Override - public Class getJavaClass() { - return Byte.class; - } - } - - private static class ShortWriter implements OrcValueWriter { - private static final ShortWriter INSTANCE = new ShortWriter(); - - @Override - public void nonNullWrite(int rowId, Short data, ColumnVector output) { - ((LongColumnVector) output).vector[rowId] = data; - } - - @Override - public Class getJavaClass() { - return Short.class; - } - } - - private static class IntWriter implements OrcValueWriter { - private static final IntWriter INSTANCE = new IntWriter(); - - @Override - public void nonNullWrite(int rowId, Integer data, ColumnVector output) { - ((LongColumnVector) output).vector[rowId] = data; - } - - @Override - public Class getJavaClass() { - return Integer.class; - } - } - - private static class LongWriter implements OrcValueWriter { - private static final LongWriter INSTANCE = new LongWriter(); - - @Override - public void nonNullWrite(int rowId, Long data, ColumnVector output) { - ((LongColumnVector) output).vector[rowId] = data; - } - - @Override - public Class getJavaClass() { - return Long.class; - } - } - - private static class FloatWriter implements OrcValueWriter { - private final FloatFieldMetrics.Builder floatFieldMetricsBuilder; + private static class FloatWriter extends GenericOrcWriters.FloatWriter { private FloatWriter(int id) { - this.floatFieldMetricsBuilder = new FloatFieldMetrics.Builder(id); - } - - @Override - public void nonNullWrite(int rowId, Float floatValue, ColumnVector output) { - ((DoubleColumnVector) output).vector[rowId] = floatValue; - floatFieldMetricsBuilder.addValue(floatValue); + super(id); } @Override public Stream> metrics() { - return Stream.of(floatFieldMetricsBuilder.build()); - } - @Override - public Class getJavaClass() { - return Float.class; + return Stream.of(getFloatFieldMetricsBuilder().build()); } + } - private static class DoubleWriter implements OrcValueWriter { - private final DoubleFieldMetrics.Builder doubleFieldMetricsBuilder; + private static class DoubleWriter extends GenericOrcWriters.DoubleWriter { private DoubleWriter(int id) { - this.doubleFieldMetricsBuilder = new DoubleFieldMetrics.Builder(id); - } - - @Override - public void nonNullWrite(int rowId, Double doubleValue, ColumnVector output) { - ((DoubleColumnVector) output).vector[rowId] = doubleValue; - doubleFieldMetricsBuilder.addValue(doubleValue); + super(id); } @Override public Stream> metrics() { - return Stream.of(doubleFieldMetricsBuilder.build()); + return Stream.of(getDoubleFieldMetricsBuilder().build()); } - @Override - public Class getJavaClass() { - return Double.class; - } - } - - private static class BytesWriter implements OrcValueWriter { - private static final BytesWriter INSTANCE = new BytesWriter(); - - @Override - public void nonNullWrite(int rowId, byte[] value, ColumnVector output) { - ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); - } - - @Override - public Class getJavaClass() { - return byte[].class; - } } private static class StringWriter implements OrcValueWriter { From bbed0a09a9f1c02b39ebf411ef00889ed814a203 Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Thu, 7 Oct 2021 16:26:26 +0200 Subject: [PATCH 07/11] refactor: pvary review, using ordinal instead of offset, more GenericOrcValueWriter reuses --- .../iceberg/data/orc/GenericOrcWriters.java | 16 ++--- .../spark/data/SparkOrcValueWriters.java | 65 +---------------- .../iceberg/spark/data/SparkOrcWriter.java | 70 +++++++++---------- 3 files changed, 41 insertions(+), 110 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index 0eb6043455cb..7efa1613de97 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -219,11 +219,11 @@ public void nonNullWrite(int rowId, Long data, ColumnVector output) { } } - public static class FloatWriter implements OrcValueWriter { + private static class FloatWriter implements OrcValueWriter { private final FloatFieldMetrics.Builder floatFieldMetricsBuilder; private long nullValueCount = 0; - protected FloatWriter(int id) { + private FloatWriter(int id) { this.floatFieldMetricsBuilder = new FloatFieldMetrics.Builder(id); } @@ -251,17 +251,13 @@ public Stream> metrics() { nullValueCount, metricsWithoutNullCount.nanValueCount(), metricsWithoutNullCount.lowerBound(), metricsWithoutNullCount.upperBound())); } - - protected FloatFieldMetrics.Builder getFloatFieldMetricsBuilder() { - return floatFieldMetricsBuilder; - } } - public static class DoubleWriter implements OrcValueWriter { + private static class DoubleWriter implements OrcValueWriter { private final DoubleFieldMetrics.Builder doubleFieldMetricsBuilder; private long nullValueCount = 0; - protected DoubleWriter(Integer id) { + private DoubleWriter(Integer id) { this.doubleFieldMetricsBuilder = new DoubleFieldMetrics.Builder(id); } @@ -289,10 +285,6 @@ public Stream> metrics() { nullValueCount, metricsWithoutNullCount.nanValueCount(), metricsWithoutNullCount.lowerBound(), metricsWithoutNullCount.upperBound())); } - - public DoubleFieldMetrics.Builder getDoubleFieldMetricsBuilder() { - return doubleFieldMetricsBuilder; - } } private static class StringWriter implements OrcValueWriter { diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index e231a1b41eae..38170fa51e5b 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.stream.Stream; import org.apache.iceberg.FieldMetrics; -import org.apache.iceberg.data.orc.GenericOrcWriters; import org.apache.iceberg.orc.OrcValueWriter; import org.apache.orc.TypeDescription; import org.apache.orc.storage.common.type.HiveDecimal; @@ -41,38 +40,6 @@ class SparkOrcValueWriters { private SparkOrcValueWriters() { } - static OrcValueWriter booleans() { - return GenericOrcWriters.booleans(); - } - - static OrcValueWriter bytes() { - return GenericOrcWriters.bytes(); - } - - static OrcValueWriter shorts() { - return GenericOrcWriters.shorts(); - } - - static OrcValueWriter ints() { - return GenericOrcWriters.ints(); - } - - static OrcValueWriter longs() { - return GenericOrcWriters.longs(); - } - - static OrcValueWriter floats(int id) { - return new FloatWriter(id); - } - - static OrcValueWriter doubles(int id) { - return new DoubleWriter(id); - } - - static OrcValueWriter byteArrays() { - return GenericOrcWriters.byteArrays(); - } - static OrcValueWriter strings() { return StringWriter.INSTANCE; } @@ -98,32 +65,6 @@ static OrcValueWriter map(OrcValueWriter keyWriter, OrcValueWriter valu return new MapWriter(keyWriter, valueWriter, orcType); } - private static class FloatWriter extends GenericOrcWriters.FloatWriter { - - private FloatWriter(int id) { - super(id); - } - - @Override - public Stream> metrics() { - return Stream.of(getFloatFieldMetricsBuilder().build()); - } - - } - - private static class DoubleWriter extends GenericOrcWriters.DoubleWriter { - - private DoubleWriter(int id) { - super(id); - } - - @Override - public Stream> metrics() { - return Stream.of(getDoubleFieldMetricsBuilder().build()); - } - - } - private static class StringWriter implements OrcValueWriter { private static final StringWriter INSTANCE = new StringWriter(); @@ -197,7 +138,7 @@ private static class ListWriter implements OrcValueWriter { throw new IllegalArgumentException("Expected one (and same) ORC type for list elements, got: " + orcTypes); } this.writer = writer; - this.fieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(0), 0, 1); + this.fieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(0), 1); } @Override @@ -238,8 +179,8 @@ private static class MapWriter implements OrcValueWriter { } this.keyWriter = keyWriter; this.valueWriter = valueWriter; - this.keyFieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(0), 0, 1); - this.valueFieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(1), 0, 1); + this.keyFieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(0), 1); + this.valueFieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(1), 1); } @Override diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index de1059090be6..1526c3e02ed0 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -26,6 +26,7 @@ import javax.annotation.Nullable; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Schema; +import org.apache.iceberg.data.orc.GenericOrcWriters; import org.apache.iceberg.orc.ORCSchemaUtil; import org.apache.iceberg.orc.OrcRowWriter; import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; @@ -98,22 +99,22 @@ public OrcValueWriter map(Types.MapType iMap, TypeDescription map, public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { switch (primitive.getCategory()) { case BOOLEAN: - return SparkOrcValueWriters.booleans(); + return GenericOrcWriters.booleans(); case BYTE: - return SparkOrcValueWriters.bytes(); + return GenericOrcWriters.bytes(); case SHORT: - return SparkOrcValueWriters.shorts(); + return GenericOrcWriters.shorts(); case DATE: case INT: - return SparkOrcValueWriters.ints(); + return GenericOrcWriters.ints(); case LONG: - return SparkOrcValueWriters.longs(); + return GenericOrcWriters.longs(); case FLOAT: - return SparkOrcValueWriters.floats(ORCSchemaUtil.fieldId(primitive)); + return GenericOrcWriters.floats(ORCSchemaUtil.fieldId(primitive)); case DOUBLE: - return SparkOrcValueWriters.doubles(ORCSchemaUtil.fieldId(primitive)); + return GenericOrcWriters.doubles(ORCSchemaUtil.fieldId(primitive)); case BINARY: - return SparkOrcValueWriters.byteArrays(); + return GenericOrcWriters.byteArrays(); case STRING: case CHAR: case VARCHAR: @@ -139,7 +140,7 @@ private static class StructWriter implements OrcValueWriter { for (int i = 0; i < orcTypes.size(); i++) { this.writers.add(writers.get(i)); - fieldGetters.add(createFieldGetter(orcTypes.get(i), i, orcTypes.size())); + fieldGetters.add(createFieldGetter(orcTypes.get(i), orcTypes.size())); } } @@ -156,7 +157,7 @@ public void rootNonNullWrite(int rowId, InternalRow value, VectorizedRowBatch ou private void write(int rowId, InternalRow value, Function colVectorAtFunc) { for (int c = 0; c < writers.size(); ++c) { - writers.get(c).write(rowId, fieldGetters.get(c).getFieldOrNull(value), colVectorAtFunc.apply(c)); + writers.get(c).write(rowId, fieldGetters.get(c).getFieldOrNull(value, c), colVectorAtFunc.apply(c)); } } @@ -172,81 +173,78 @@ public Stream> metrics() { } - static FieldGetter createFieldGetter(TypeDescription fieldType, int fieldPos, int fieldCount) { + static FieldGetter createFieldGetter(TypeDescription fieldType, int fieldCount) { final FieldGetter fieldGetter; switch (fieldType.getCategory()) { case BOOLEAN: - fieldGetter = (row, offset) -> row.getBoolean(fieldPos + offset); + fieldGetter = (row, ordinal) -> row.getBoolean(ordinal); break; case BYTE: - fieldGetter = (row, offset) -> row.getByte(fieldPos + offset); + fieldGetter = (row, ordinal) -> row.getByte(ordinal); break; case SHORT: - fieldGetter = (row, offset) -> row.getShort(fieldPos + offset); + fieldGetter = (row, ordinal) -> row.getShort(ordinal); break; case DATE: case INT: - fieldGetter = (row, offset) -> row.getInt(fieldPos + offset); + fieldGetter = (row, ordinal) -> row.getInt(ordinal); break; case LONG: case TIMESTAMP: case TIMESTAMP_INSTANT: - fieldGetter = (row, offset) -> row.getLong(fieldPos + offset); + fieldGetter = (row, ordinal) -> row.getLong(ordinal); break; case FLOAT: - fieldGetter = (row, offset) -> row.getFloat(fieldPos + offset); + fieldGetter = (row, ordinal) -> row.getFloat(ordinal); break; case DOUBLE: - fieldGetter = (row, offset) -> row.getDouble(fieldPos + offset); + fieldGetter = (row, ordinal) -> row.getDouble(ordinal); break; case BINARY: - fieldGetter = (row, offset) -> row.getBinary(fieldPos + offset); + fieldGetter = (row, ordinal) -> row.getBinary(ordinal); // getBinary always makes a copy, so we don't need to worry about it // being changed behind our back. break; case DECIMAL: - fieldGetter = (row, offset) -> - row.getDecimal(fieldPos + offset, fieldType.getPrecision(), fieldType.getScale()); + fieldGetter = (row, ordinal) -> + row.getDecimal(ordinal, fieldType.getPrecision(), fieldType.getScale()); break; case STRING: case CHAR: case VARCHAR: - fieldGetter = (row, offset) -> row.getUTF8String(fieldPos + offset); + fieldGetter = (row, ordinal) -> row.getUTF8String(ordinal); break; case STRUCT: - fieldGetter = (row, offset) -> row.getStruct(fieldPos + offset, fieldCount); + fieldGetter = (row, ordinal) -> row.getStruct(ordinal, fieldCount); break; case LIST: - fieldGetter = (row, offset) -> row.getArray(fieldPos + offset); + fieldGetter = (row, ordinal) -> row.getArray(ordinal); break; case MAP: - fieldGetter = (row, offset) -> row.getMap(fieldPos + offset); + fieldGetter = (row, ordinal) -> row.getMap(ordinal); break; default: throw new IllegalArgumentException(); } - return (row, offset) -> { - if (row.isNullAt(fieldPos + offset)) { + return (row, ordinal) -> { + if (row.isNullAt(ordinal)) { return null; } - return fieldGetter.getFieldOrNull(row, offset); + return fieldGetter.getFieldOrNull(row, ordinal); }; } interface FieldGetter extends Serializable { - @Nullable - default T getFieldOrNull(SpecializedGetters row) { - return getFieldOrNull(row, 0); - } /** - * + * Returns a value from a complex Spark data holder such ArrayData, InternalRow, etc... + * Calls the appropriate getter for the expected data type. * @param row Spark's data representation - * @param offset added to ordinal before being passed to respective getter method - used for complex types only - * @return field value at ordinal and offset + * @param ordinal index in the data structure (e.g. column index for InterRow, list index in ArrayData, etc..) + * @return field value at ordinal */ @Nullable - T getFieldOrNull(SpecializedGetters row, int offset); + T getFieldOrNull(SpecializedGetters row, int ordinal); } } From 4df4c19962189b90f4bb81e1d1f2b65c0e3f5968 Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Fri, 8 Oct 2021 09:48:30 +0200 Subject: [PATCH 08/11] refactor: create and inherit from an abstract StructWriter for ORC --- .../iceberg/data/orc/GenericOrcWriter.java | 62 ++++++++++++------- .../iceberg/flink/data/FlinkOrcWriter.java | 20 ++---- .../iceberg/flink/data/FlinkOrcWriters.java | 28 +++------ .../iceberg/spark/data/SparkOrcWriter.java | 44 ++++--------- 4 files changed, 66 insertions(+), 88 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index 29426bc97566..6856f24f248d 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -20,6 +20,7 @@ package org.apache.iceberg.data.orc; import java.util.List; +import java.util.function.Function; import java.util.stream.Stream; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Schema; @@ -37,13 +38,13 @@ import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; public class GenericOrcWriter implements OrcRowWriter { - private final OrcValueWriter writer; + private final RecordWriter writer; private GenericOrcWriter(Schema expectedSchema, TypeDescription orcSchema) { Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT, "Top level must be a struct " + orcSchema); - writer = OrcSchemaWithTypeVisitor.visit(expectedSchema, orcSchema, new WriteBuilder()); + writer = (RecordWriter) OrcSchemaWithTypeVisitor.visit(expectedSchema, orcSchema, new WriteBuilder()); } public static OrcRowWriter buildWriter(Schema expectedSchema, TypeDescription fileSchema) { @@ -115,17 +116,13 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescriptio } @Override - @SuppressWarnings("unchecked") public void write(Record value, VectorizedRowBatch output) { - Preconditions.checkArgument(writer instanceof RecordWriter, "writer must be a RecordWriter."); + Preconditions.checkArgument(value != null, "value must not be null"); int row = output.size; output.size += 1; - List> writers = ((RecordWriter) writer).writers(); - for (int c = 0; c < writers.size(); ++c) { - OrcValueWriter child = writers.get(c); - child.write(row, value.get(c, child.getJavaClass()), output.cols[c]); - } + + writer.rootNonNullWrite(row, value, output); } @Override @@ -133,35 +130,58 @@ public Stream> metrics() { return writer.metrics(); } - private static class RecordWriter implements OrcValueWriter { + public abstract static class StructWriter implements OrcValueWriter { private final List> writers; - RecordWriter(List> writers) { + protected StructWriter(List> writers) { this.writers = writers; } - List> writers() { - return writers; + public List> writers() { + return this.writers; } @Override - public Class getJavaClass() { - return Record.class; + public Stream> metrics() { + return writers.stream().flatMap(OrcValueWriter::metrics); } @Override - @SuppressWarnings("unchecked") - public void nonNullWrite(int rowId, Record data, ColumnVector output) { + public void nonNullWrite(int rowId, S value, ColumnVector output) { StructColumnVector cv = (StructColumnVector) output; + write(rowId, value, c -> cv.fields[c]); + } + + // Special case of writing the root struct + public void rootNonNullWrite(int rowId, S value, VectorizedRowBatch output) { + write(rowId, value, c -> output.cols[c]); + } + + private void write(int rowId, S value, Function colVectorAtFunc) { for (int c = 0; c < writers.size(); ++c) { - OrcValueWriter child = writers.get(c); - child.write(rowId, data.get(c, child.getJavaClass()), cv.fields[c]); + OrcValueWriter writer = writers.get(c); + writer.write(rowId, get(value, c), colVectorAtFunc.apply(c)); } } + protected abstract Object get(S struct, int index); + } + + private static class RecordWriter extends StructWriter { + + RecordWriter(List> writers) { + super(writers); + } + @Override - public Stream> metrics() { - return writers.stream().flatMap(OrcValueWriter::metrics); + public Class getJavaClass() { + return Record.class; + } + + @Override + protected Object get(Record struct, int index) { + return struct.get(index, writers().get(index).getJavaClass()); } + } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java index 81f9822815b6..87ec15fdccfa 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java @@ -37,17 +37,10 @@ import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; public class FlinkOrcWriter implements OrcRowWriter { - private final FlinkOrcWriters.StructWriter writer; - private final List fieldGetters; + private final FlinkOrcWriters.RowDataWriter writer; private FlinkOrcWriter(RowType rowType, Schema iSchema) { - this.writer = (FlinkOrcWriters.StructWriter) FlinkSchemaVisitor.visit(rowType, iSchema, new WriteBuilder()); - - List fieldTypes = rowType.getChildren(); - this.fieldGetters = Lists.newArrayListWithExpectedSize(fieldTypes.size()); - for (int i = 0; i < fieldTypes.size(); i++) { - fieldGetters.add(RowData.createFieldGetter(fieldTypes.get(i), i)); - } + this.writer = (FlinkOrcWriters.RowDataWriter) FlinkSchemaVisitor.visit(rowType, iSchema, new WriteBuilder()); } public static OrcRowWriter buildWriter(RowType rowType, Schema iSchema) { @@ -55,16 +48,13 @@ public static OrcRowWriter buildWriter(RowType rowType, Schema iSchema) } @Override - @SuppressWarnings("unchecked") public void write(RowData row, VectorizedRowBatch output) { + Preconditions.checkArgument(row != null, "value must not be null"); + int rowId = output.size; output.size += 1; - List> writers = writer.writers(); - for (int c = 0; c < writers.size(); ++c) { - OrcValueWriter child = writers.get(c); - child.write(rowId, fieldGetters.get(c).getFieldOrNull(row), output.cols[c]); - } + writer.rootNonNullWrite(rowId, row, output); } @Override diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java index 38a348995f00..2c27f18a2ce6 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java @@ -32,6 +32,7 @@ import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.orc.OrcValueWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -42,7 +43,6 @@ import org.apache.orc.storage.ql.exec.vector.ListColumnVector; import org.apache.orc.storage.ql.exec.vector.LongColumnVector; import org.apache.orc.storage.ql.exec.vector.MapColumnVector; -import org.apache.orc.storage.ql.exec.vector.StructColumnVector; import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; class FlinkOrcWriters { @@ -90,7 +90,7 @@ static OrcValueWriter map(OrcValueWriter keyWriter, OrcValueW } static OrcValueWriter struct(List> writers, List types) { - return new StructWriter(writers, types); + return new RowDataWriter(writers, types); } private static class StringWriter implements OrcValueWriter { @@ -311,12 +311,11 @@ public Stream> metrics() { } } - static class StructWriter implements OrcValueWriter { - private final List> writers; + static class RowDataWriter extends GenericOrcWriter.StructWriter { private final List fieldGetters; - StructWriter(List> writers, List types) { - this.writers = writers; + RowDataWriter(List> writers, List types) { + super(writers); this.fieldGetters = Lists.newArrayListWithExpectedSize(types.size()); for (int i = 0; i < types.size(); i++) { @@ -324,29 +323,16 @@ static class StructWriter implements OrcValueWriter { } } - List> writers() { - return writers; - } - @Override public Class getJavaClass() { return RowData.class; } @Override - @SuppressWarnings("unchecked") - public void nonNullWrite(int rowId, RowData data, ColumnVector output) { - StructColumnVector cv = (StructColumnVector) output; - for (int c = 0; c < writers.size(); ++c) { - OrcValueWriter writer = writers.get(c); - writer.write(rowId, fieldGetters.get(c).getFieldOrNull(data), cv.fields[c]); - } + protected Object get(RowData struct, int index) { + return fieldGetters.get(index).getFieldOrNull(struct); } - @Override - public Stream> metrics() { - return writers.stream().flatMap(OrcValueWriter::metrics); - } } private static void growColumnVector(ColumnVector cv, int requestedSize) { diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index 1526c3e02ed0..49d5c0fbb5ee 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -21,11 +21,11 @@ import java.io.Serializable; import java.util.List; -import java.util.function.Function; import java.util.stream.Stream; import javax.annotation.Nullable; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Schema; +import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.data.orc.GenericOrcWriters; import org.apache.iceberg.orc.ORCSchemaUtil; import org.apache.iceberg.orc.OrcRowWriter; @@ -36,8 +36,6 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; -import org.apache.orc.storage.ql.exec.vector.ColumnVector; -import org.apache.orc.storage.ql.exec.vector.StructColumnVector; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; @@ -48,24 +46,23 @@ */ public class SparkOrcWriter implements OrcRowWriter { - private final OrcValueWriter writer; + private final InternalRowWriter writer; public SparkOrcWriter(Schema iSchema, TypeDescription orcSchema) { Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT, "Top level must be a struct " + orcSchema); - writer = OrcSchemaWithTypeVisitor.visit(iSchema, orcSchema, new WriteBuilder()); + writer = (InternalRowWriter) OrcSchemaWithTypeVisitor.visit(iSchema, orcSchema, new WriteBuilder()); } @Override public void write(InternalRow value, VectorizedRowBatch output) { - Preconditions.checkArgument(writer instanceof StructWriter, "writer must be StructWriter"); Preconditions.checkArgument(value != null, "value must not be null"); int row = output.size; output.size += 1; - ((StructWriter) writer).rootNonNullWrite(row, value, output); + writer.rootNonNullWrite(row, value, output); } @Override @@ -80,7 +77,7 @@ private WriteBuilder() { @Override public OrcValueWriter record(Types.StructType iStruct, TypeDescription record, List names, List> fields) { - return new StructWriter(fields, record.getChildren()); + return new InternalRowWriter(fields, record.getChildren()); } @Override @@ -130,37 +127,18 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescriptio } } - private static class StructWriter implements OrcValueWriter { - private final List writers; + private static class InternalRowWriter extends GenericOrcWriter.StructWriter { private final List> fieldGetters; - StructWriter(List> writers, List orcTypes) { - this.writers = Lists.newArrayListWithExpectedSize(writers.size()); + InternalRowWriter(List> writers, List orcTypes) { + super(writers); this.fieldGetters = Lists.newArrayListWithExpectedSize(orcTypes.size()); for (int i = 0; i < orcTypes.size(); i++) { - this.writers.add(writers.get(i)); fieldGetters.add(createFieldGetter(orcTypes.get(i), orcTypes.size())); } } - @Override - public void nonNullWrite(int rowId, InternalRow value, ColumnVector output) { - StructColumnVector cv = (StructColumnVector) output; - write(rowId, value, c -> cv.fields[c]); - } - - // Special case of writing the root struct - public void rootNonNullWrite(int rowId, InternalRow value, VectorizedRowBatch output) { - write(rowId, value, c -> output.cols[c]); - } - - private void write(int rowId, InternalRow value, Function colVectorAtFunc) { - for (int c = 0; c < writers.size(); ++c) { - writers.get(c).write(rowId, fieldGetters.get(c).getFieldOrNull(value, c), colVectorAtFunc.apply(c)); - } - } - @Override public Class getJavaClass() { return InternalRow.class; @@ -168,9 +146,13 @@ public Class getJavaClass() { @Override public Stream> metrics() { - return writers.stream().flatMap(OrcValueWriter::metrics); + return writers().stream().flatMap(OrcValueWriter::metrics); } + @Override + protected Object get(InternalRow struct, int index) { + return fieldGetters.get(index).getFieldOrNull(struct, index); + } } static FieldGetter createFieldGetter(TypeDescription fieldType, int fieldCount) { From 62e977bba05bf32dcb51388b686444f57f621ee5 Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Sat, 9 Oct 2021 12:33:19 +0200 Subject: [PATCH 09/11] refactor: fixed the bug of supplying wrong field count to Spakr ORC struct writes --- .../org/apache/iceberg/spark/data/SparkOrcValueWriters.java | 6 +++--- .../java/org/apache/iceberg/spark/data/SparkOrcWriter.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index 38170fa51e5b..a56977ffdfdf 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -138,7 +138,7 @@ private static class ListWriter implements OrcValueWriter { throw new IllegalArgumentException("Expected one (and same) ORC type for list elements, got: " + orcTypes); } this.writer = writer; - this.fieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(0), 1); + this.fieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(0)); } @Override @@ -179,8 +179,8 @@ private static class MapWriter implements OrcValueWriter { } this.keyWriter = keyWriter; this.valueWriter = valueWriter; - this.keyFieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(0), 1); - this.valueFieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(1), 1); + this.keyFieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(0)); + this.valueFieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(1)); } @Override diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index 49d5c0fbb5ee..b981901f1fd0 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -135,7 +135,7 @@ private static class InternalRowWriter extends GenericOrcWriter.StructWriter createFieldGetter(TypeDescription fieldType, int fieldCount) { + static FieldGetter createFieldGetter(TypeDescription fieldType) { final FieldGetter fieldGetter; switch (fieldType.getCategory()) { case BOOLEAN: @@ -197,7 +197,7 @@ static FieldGetter createFieldGetter(TypeDescription fieldType, int fieldCoun fieldGetter = (row, ordinal) -> row.getUTF8String(ordinal); break; case STRUCT: - fieldGetter = (row, ordinal) -> row.getStruct(ordinal, fieldCount); + fieldGetter = (row, ordinal) -> row.getStruct(ordinal, fieldType.getChildren().size()); break; case LIST: fieldGetter = (row, ordinal) -> row.getArray(ordinal); From 96bfddc4fc04066d874cb35aef7d006966439198 Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Mon, 11 Oct 2021 13:31:14 +0200 Subject: [PATCH 10/11] refactor: review round --- .../iceberg/data/orc/GenericOrcWriter.java | 49 +------------------ .../iceberg/data/orc/GenericOrcWriters.java | 43 ++++++++++++++++ .../iceberg/flink/data/FlinkOrcWriter.java | 6 +-- .../iceberg/flink/data/FlinkOrcWriters.java | 4 +- .../iceberg/spark/data/SparkOrcWriter.java | 33 ++++++------- 5 files changed, 62 insertions(+), 73 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index 6856f24f248d..fcbc66a19817 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -20,7 +20,6 @@ package org.apache.iceberg.data.orc; import java.util.List; -import java.util.function.Function; import java.util.stream.Stream; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Schema; @@ -33,8 +32,6 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; -import org.apache.orc.storage.ql.exec.vector.ColumnVector; -import org.apache.orc.storage.ql.exec.vector.StructColumnVector; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; public class GenericOrcWriter implements OrcRowWriter { @@ -118,11 +115,7 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescriptio @Override public void write(Record value, VectorizedRowBatch output) { Preconditions.checkArgument(value != null, "value must not be null"); - - int row = output.size; - output.size += 1; - - writer.rootNonNullWrite(row, value, output); + writer.rootNonNullWrite(value, output); } @Override @@ -130,44 +123,7 @@ public Stream> metrics() { return writer.metrics(); } - public abstract static class StructWriter implements OrcValueWriter { - private final List> writers; - - protected StructWriter(List> writers) { - this.writers = writers; - } - - public List> writers() { - return this.writers; - } - - @Override - public Stream> metrics() { - return writers.stream().flatMap(OrcValueWriter::metrics); - } - - @Override - public void nonNullWrite(int rowId, S value, ColumnVector output) { - StructColumnVector cv = (StructColumnVector) output; - write(rowId, value, c -> cv.fields[c]); - } - - // Special case of writing the root struct - public void rootNonNullWrite(int rowId, S value, VectorizedRowBatch output) { - write(rowId, value, c -> output.cols[c]); - } - - private void write(int rowId, S value, Function colVectorAtFunc) { - for (int c = 0; c < writers.size(); ++c) { - OrcValueWriter writer = writers.get(c); - writer.write(rowId, get(value, c), colVectorAtFunc.apply(c)); - } - } - - protected abstract Object get(S struct, int index); - } - - private static class RecordWriter extends StructWriter { + private static class RecordWriter extends GenericOrcWriters.StructWriter { RecordWriter(List> writers) { super(writers); @@ -182,6 +138,5 @@ public Class getJavaClass() { protected Object get(Record struct, int index) { return struct.get(index, writers().get(index).getJavaClass()); } - } } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index 7efa1613de97..7f4a8ad48690 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.function.Function; import java.util.stream.Stream; import org.apache.iceberg.DoubleFieldMetrics; import org.apache.iceberg.FieldMetrics; @@ -48,7 +49,9 @@ import org.apache.orc.storage.ql.exec.vector.ListColumnVector; import org.apache.orc.storage.ql.exec.vector.LongColumnVector; import org.apache.orc.storage.ql.exec.vector.MapColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; public class GenericOrcWriters { private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); @@ -531,6 +534,46 @@ public Stream> metrics() { } } + public abstract static class StructWriter implements OrcValueWriter { + private final List> writers; + + protected StructWriter(List> writers) { + this.writers = writers; + } + + public List> writers() { + return writers; + } + + @Override + public Stream> metrics() { + return writers.stream().flatMap(OrcValueWriter::metrics); + } + + @Override + public void nonNullWrite(int rowId, S value, ColumnVector output) { + StructColumnVector cv = (StructColumnVector) output; + write(rowId, value, c -> cv.fields[c]); + } + + // Special case of writing the root struct + public void rootNonNullWrite(S value, VectorizedRowBatch output) { + int rowId = output.size; + output.size += 1; + write(rowId, value, c -> output.cols[c]); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private void write(int rowId, S value, Function colVectorAtFunc) { + for (int c = 0; c < writers.size(); ++c) { + OrcValueWriter writer = writers.get(c); + writer.write(rowId, get(value, c), colVectorAtFunc.apply(c)); + } + } + + protected abstract Object get(S struct, int index); + } + private static void growColumnVector(ColumnVector cv, int requestedSize) { if (cv.isNull.length < requestedSize) { // Use growth factor of 3 to avoid frequent array allocations diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java index 87ec15fdccfa..696a23807e65 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java @@ -50,11 +50,7 @@ public static OrcRowWriter buildWriter(RowType rowType, Schema iSchema) @Override public void write(RowData row, VectorizedRowBatch output) { Preconditions.checkArgument(row != null, "value must not be null"); - - int rowId = output.size; - output.size += 1; - - writer.rootNonNullWrite(rowId, row, output); + writer.rootNonNullWrite(row, output); } @Override diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java index 2c27f18a2ce6..4ba81ccd3434 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java @@ -32,7 +32,7 @@ import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.iceberg.FieldMetrics; -import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.data.orc.GenericOrcWriters; import org.apache.iceberg.orc.OrcValueWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -311,7 +311,7 @@ public Stream> metrics() { } } - static class RowDataWriter extends GenericOrcWriter.StructWriter { + static class RowDataWriter extends GenericOrcWriters.StructWriter { private final List fieldGetters; RowDataWriter(List> writers, List types) { diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index b981901f1fd0..2d35e3b0688c 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -25,7 +25,6 @@ import javax.annotation.Nullable; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Schema; -import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.data.orc.GenericOrcWriters; import org.apache.iceberg.orc.ORCSchemaUtil; import org.apache.iceberg.orc.OrcRowWriter; @@ -58,11 +57,7 @@ public SparkOrcWriter(Schema iSchema, TypeDescription orcSchema) { @Override public void write(InternalRow value, VectorizedRowBatch output) { Preconditions.checkArgument(value != null, "value must not be null"); - - int row = output.size; - output.size += 1; - - writer.rootNonNullWrite(row, value, output); + writer.rootNonNullWrite(value, output); } @Override @@ -127,7 +122,7 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescriptio } } - private static class InternalRowWriter extends GenericOrcWriter.StructWriter { + private static class InternalRowWriter extends GenericOrcWriters.StructWriter { private final List> fieldGetters; InternalRowWriter(List> writers, List orcTypes) { @@ -159,31 +154,31 @@ static FieldGetter createFieldGetter(TypeDescription fieldType) { final FieldGetter fieldGetter; switch (fieldType.getCategory()) { case BOOLEAN: - fieldGetter = (row, ordinal) -> row.getBoolean(ordinal); + fieldGetter = SpecializedGetters::getBoolean; break; case BYTE: - fieldGetter = (row, ordinal) -> row.getByte(ordinal); + fieldGetter = SpecializedGetters::getByte; break; case SHORT: - fieldGetter = (row, ordinal) -> row.getShort(ordinal); + fieldGetter = SpecializedGetters::getShort; break; case DATE: case INT: - fieldGetter = (row, ordinal) -> row.getInt(ordinal); + fieldGetter = SpecializedGetters::getInt; break; case LONG: case TIMESTAMP: case TIMESTAMP_INSTANT: - fieldGetter = (row, ordinal) -> row.getLong(ordinal); + fieldGetter = SpecializedGetters::getLong; break; case FLOAT: - fieldGetter = (row, ordinal) -> row.getFloat(ordinal); + fieldGetter = SpecializedGetters::getFloat; break; case DOUBLE: - fieldGetter = (row, ordinal) -> row.getDouble(ordinal); + fieldGetter = SpecializedGetters::getDouble; break; case BINARY: - fieldGetter = (row, ordinal) -> row.getBinary(ordinal); + fieldGetter = SpecializedGetters::getBinary; // getBinary always makes a copy, so we don't need to worry about it // being changed behind our back. break; @@ -194,19 +189,19 @@ static FieldGetter createFieldGetter(TypeDescription fieldType) { case STRING: case CHAR: case VARCHAR: - fieldGetter = (row, ordinal) -> row.getUTF8String(ordinal); + fieldGetter = SpecializedGetters::getUTF8String; break; case STRUCT: fieldGetter = (row, ordinal) -> row.getStruct(ordinal, fieldType.getChildren().size()); break; case LIST: - fieldGetter = (row, ordinal) -> row.getArray(ordinal); + fieldGetter = SpecializedGetters::getArray; break; case MAP: - fieldGetter = (row, ordinal) -> row.getMap(ordinal); + fieldGetter = SpecializedGetters::getMap; break; default: - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Encountered an unsupported ORC type during a write from Spark."); } return (row, ordinal) -> { From 53dc45d77e65fc612a914a42fd990ef246fbc5ad Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Wed, 13 Oct 2021 11:38:10 +0200 Subject: [PATCH 11/11] review round 2 --- .../iceberg/data/orc/GenericOrcWriter.java | 9 +- .../iceberg/data/orc/GenericOrcWriters.java | 97 +------------------ .../iceberg/flink/data/FlinkOrcWriter.java | 2 +- .../iceberg/flink/data/FlinkOrcWriters.java | 50 ---------- .../apache/iceberg/orc/OrcValueWriter.java | 2 - .../spark/data/SparkOrcValueWriters.java | 58 ++++------- .../iceberg/spark/data/SparkOrcWriter.java | 12 +-- 7 files changed, 23 insertions(+), 207 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index fcbc66a19817..4e0cb7793537 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -115,7 +115,7 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescriptio @Override public void write(Record value, VectorizedRowBatch output) { Preconditions.checkArgument(value != null, "value must not be null"); - writer.rootNonNullWrite(value, output); + writer.writeRow(value, output); } @Override @@ -129,14 +129,9 @@ private static class RecordWriter extends GenericOrcWriters.StructWriter super(writers); } - @Override - public Class getJavaClass() { - return Record.class; - } - @Override protected Object get(Record struct, int index) { - return struct.get(index, writers().get(index).getJavaClass()); + return struct.get(index); } } } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index 7f4a8ad48690..e0d2c5aab90b 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -141,11 +141,6 @@ public static OrcValueWriter> map(OrcValueWriter key, OrcVal private static class BooleanWriter implements OrcValueWriter { private static final OrcValueWriter INSTANCE = new BooleanWriter(); - @Override - public Class getJavaClass() { - return Boolean.class; - } - @Override public void nonNullWrite(int rowId, Boolean data, ColumnVector output) { ((LongColumnVector) output).vector[rowId] = data ? 1 : 0; @@ -155,11 +150,6 @@ public void nonNullWrite(int rowId, Boolean data, ColumnVector output) { private static class ByteWriter implements OrcValueWriter { private static final OrcValueWriter INSTANCE = new ByteWriter(); - @Override - public Class getJavaClass() { - return Byte.class; - } - @Override public void nonNullWrite(int rowId, Byte data, ColumnVector output) { ((LongColumnVector) output).vector[rowId] = data; @@ -169,11 +159,6 @@ public void nonNullWrite(int rowId, Byte data, ColumnVector output) { private static class ShortWriter implements OrcValueWriter { private static final OrcValueWriter INSTANCE = new ShortWriter(); - @Override - public Class getJavaClass() { - return Short.class; - } - @Override public void nonNullWrite(int rowId, Short data, ColumnVector output) { ((LongColumnVector) output).vector[rowId] = data; @@ -183,11 +168,6 @@ public void nonNullWrite(int rowId, Short data, ColumnVector output) { private static class IntWriter implements OrcValueWriter { private static final OrcValueWriter INSTANCE = new IntWriter(); - @Override - public Class getJavaClass() { - return Integer.class; - } - @Override public void nonNullWrite(int rowId, Integer data, ColumnVector output) { ((LongColumnVector) output).vector[rowId] = data; @@ -197,11 +177,6 @@ public void nonNullWrite(int rowId, Integer data, ColumnVector output) { private static class TimeWriter implements OrcValueWriter { private static final OrcValueWriter INSTANCE = new TimeWriter(); - @Override - public Class getJavaClass() { - return LocalTime.class; - } - @Override public void nonNullWrite(int rowId, LocalTime data, ColumnVector output) { ((LongColumnVector) output).vector[rowId] = data.toNanoOfDay() / 1_000; @@ -211,11 +186,6 @@ public void nonNullWrite(int rowId, LocalTime data, ColumnVector output) { private static class LongWriter implements OrcValueWriter { private static final OrcValueWriter INSTANCE = new LongWriter(); - @Override - public Class getJavaClass() { - return Long.class; - } - @Override public void nonNullWrite(int rowId, Long data, ColumnVector output) { ((LongColumnVector) output).vector[rowId] = data; @@ -230,11 +200,6 @@ private FloatWriter(int id) { this.floatFieldMetricsBuilder = new FloatFieldMetrics.Builder(id); } - @Override - public Class getJavaClass() { - return Float.class; - } - @Override public void nonNullWrite(int rowId, Float data, ColumnVector output) { ((DoubleColumnVector) output).vector[rowId] = data; @@ -264,11 +229,6 @@ private DoubleWriter(Integer id) { this.doubleFieldMetricsBuilder = new DoubleFieldMetrics.Builder(id); } - @Override - public Class getJavaClass() { - return Double.class; - } - @Override public void nonNullWrite(int rowId, Double data, ColumnVector output) { ((DoubleColumnVector) output).vector[rowId] = data; @@ -293,11 +253,6 @@ public Stream> metrics() { private static class StringWriter implements OrcValueWriter { private static final OrcValueWriter INSTANCE = new StringWriter(); - @Override - public Class getJavaClass() { - return String.class; - } - @Override public void nonNullWrite(int rowId, String data, ColumnVector output) { byte[] value = data.getBytes(StandardCharsets.UTF_8); @@ -308,11 +263,6 @@ public void nonNullWrite(int rowId, String data, ColumnVector output) { private static class ByteBufferWriter implements OrcValueWriter { private static final OrcValueWriter INSTANCE = new ByteBufferWriter(); - @Override - public Class getJavaClass() { - return ByteBuffer.class; - } - @Override public void nonNullWrite(int rowId, ByteBuffer data, ColumnVector output) { if (data.hasArray()) { @@ -328,11 +278,6 @@ public void nonNullWrite(int rowId, ByteBuffer data, ColumnVector output) { private static class UUIDWriter implements OrcValueWriter { private static final OrcValueWriter INSTANCE = new UUIDWriter(); - @Override - public Class getJavaClass() { - return UUID.class; - } - @Override @SuppressWarnings("ByteBufferBackingArray") public void nonNullWrite(int rowId, UUID data, ColumnVector output) { @@ -346,11 +291,6 @@ public void nonNullWrite(int rowId, UUID data, ColumnVector output) { private static class ByteArrayWriter implements OrcValueWriter { private static final OrcValueWriter INSTANCE = new ByteArrayWriter(); - @Override - public Class getJavaClass() { - return byte[].class; - } - @Override public void nonNullWrite(int rowId, byte[] data, ColumnVector output) { ((BytesColumnVector) output).setRef(rowId, data, 0, data.length); @@ -360,11 +300,6 @@ public void nonNullWrite(int rowId, byte[] data, ColumnVector output) { private static class DateWriter implements OrcValueWriter { private static final OrcValueWriter INSTANCE = new DateWriter(); - @Override - public Class getJavaClass() { - return LocalDate.class; - } - @Override public void nonNullWrite(int rowId, LocalDate data, ColumnVector output) { ((LongColumnVector) output).vector[rowId] = ChronoUnit.DAYS.between(EPOCH_DAY, data); @@ -374,11 +309,6 @@ public void nonNullWrite(int rowId, LocalDate data, ColumnVector output) { private static class TimestampTzWriter implements OrcValueWriter { private static final OrcValueWriter INSTANCE = new TimestampTzWriter(); - @Override - public Class getJavaClass() { - return OffsetDateTime.class; - } - @Override public void nonNullWrite(int rowId, OffsetDateTime data, ColumnVector output) { TimestampColumnVector cv = (TimestampColumnVector) output; @@ -392,11 +322,6 @@ public void nonNullWrite(int rowId, OffsetDateTime data, ColumnVector output) { private static class TimestampWriter implements OrcValueWriter { private static final OrcValueWriter INSTANCE = new TimestampWriter(); - @Override - public Class getJavaClass() { - return LocalDateTime.class; - } - @Override public void nonNullWrite(int rowId, LocalDateTime data, ColumnVector output) { TimestampColumnVector cv = (TimestampColumnVector) output; @@ -415,11 +340,6 @@ private static class Decimal18Writer implements OrcValueWriter { this.scale = scale; } - @Override - public Class getJavaClass() { - return BigDecimal.class; - } - @Override public void nonNullWrite(int rowId, BigDecimal data, ColumnVector output) { Preconditions.checkArgument(data.scale() == scale, @@ -441,11 +361,6 @@ private static class Decimal38Writer implements OrcValueWriter { this.scale = scale; } - @Override - public Class getJavaClass() { - return BigDecimal.class; - } - @Override public void nonNullWrite(int rowId, BigDecimal data, ColumnVector output) { Preconditions.checkArgument(data.scale() == scale, @@ -464,11 +379,6 @@ private static class ListWriter implements OrcValueWriter> { this.element = element; } - @Override - public Class getJavaClass() { - return List.class; - } - @Override public void nonNullWrite(int rowId, List value, ColumnVector output) { ListColumnVector cv = (ListColumnVector) output; @@ -499,11 +409,6 @@ private static class MapWriter implements OrcValueWriter> { this.valueWriter = valueWriter; } - @Override - public Class getJavaClass() { - return Map.class; - } - @Override public void nonNullWrite(int rowId, Map map, ColumnVector output) { List keys = Lists.newArrayListWithExpectedSize(map.size()); @@ -557,7 +462,7 @@ public void nonNullWrite(int rowId, S value, ColumnVector output) { } // Special case of writing the root struct - public void rootNonNullWrite(S value, VectorizedRowBatch output) { + public void writeRow(S value, VectorizedRowBatch output) { int rowId = output.size; output.size += 1; write(rowId, value, c -> output.cols[c]); diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java index 696a23807e65..3f469b755f6b 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java @@ -50,7 +50,7 @@ public static OrcRowWriter buildWriter(RowType rowType, Schema iSchema) @Override public void write(RowData row, VectorizedRowBatch output) { Preconditions.checkArgument(row != null, "value must not be null"); - writer.rootNonNullWrite(row, output); + writer.writeRow(row, output); } @Override diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java index 4ba81ccd3434..6b596ac2063c 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java @@ -96,11 +96,6 @@ static OrcValueWriter struct(List> writers, List { private static final StringWriter INSTANCE = new StringWriter(); - @Override - public Class getJavaClass() { - return StringData.class; - } - @Override public void nonNullWrite(int rowId, StringData data, ColumnVector output) { byte[] value = data.toBytes(); @@ -111,11 +106,6 @@ public void nonNullWrite(int rowId, StringData data, ColumnVector output) { private static class DateWriter implements OrcValueWriter { private static final DateWriter INSTANCE = new DateWriter(); - @Override - public Class getJavaClass() { - return Integer.class; - } - @Override public void nonNullWrite(int rowId, Integer data, ColumnVector output) { ((LongColumnVector) output).vector[rowId] = data; @@ -125,11 +115,6 @@ public void nonNullWrite(int rowId, Integer data, ColumnVector output) { private static class TimeWriter implements OrcValueWriter { private static final TimeWriter INSTANCE = new TimeWriter(); - @Override - public Class getJavaClass() { - return Integer.class; - } - @Override public void nonNullWrite(int rowId, Integer millis, ColumnVector output) { // The time in flink is in millisecond, while the standard time in iceberg is microsecond. @@ -141,11 +126,6 @@ public void nonNullWrite(int rowId, Integer millis, ColumnVector output) { private static class TimestampWriter implements OrcValueWriter { private static final TimestampWriter INSTANCE = new TimestampWriter(); - @Override - public Class getJavaClass() { - return TimestampData.class; - } - @Override public void nonNullWrite(int rowId, TimestampData data, ColumnVector output) { TimestampColumnVector cv = (TimestampColumnVector) output; @@ -161,11 +141,6 @@ public void nonNullWrite(int rowId, TimestampData data, ColumnVector output) { private static class TimestampTzWriter implements OrcValueWriter { private static final TimestampTzWriter INSTANCE = new TimestampTzWriter(); - @Override - public Class getJavaClass() { - return TimestampData.class; - } - @Override public void nonNullWrite(int rowId, TimestampData data, ColumnVector output) { TimestampColumnVector cv = (TimestampColumnVector) output; @@ -186,11 +161,6 @@ private static class Decimal18Writer implements OrcValueWriter { this.scale = scale; } - @Override - public Class getJavaClass() { - return DecimalData.class; - } - @Override public void nonNullWrite(int rowId, DecimalData data, ColumnVector output) { Preconditions.checkArgument(scale == data.scale(), @@ -211,11 +181,6 @@ private static class Decimal38Writer implements OrcValueWriter { this.scale = scale; } - @Override - public Class getJavaClass() { - return DecimalData.class; - } - @Override public void nonNullWrite(int rowId, DecimalData data, ColumnVector output) { Preconditions.checkArgument(scale == data.scale(), @@ -236,11 +201,6 @@ static class ListWriter implements OrcValueWriter { this.elementGetter = ArrayData.createElementGetter(elementType); } - @Override - public Class getJavaClass() { - return ArrayData.class; - } - @Override @SuppressWarnings("unchecked") public void nonNullWrite(int rowId, ArrayData data, ColumnVector output) { @@ -278,11 +238,6 @@ static class MapWriter implements OrcValueWriter { this.valueGetter = ArrayData.createElementGetter(valueType); } - @Override - public Class getJavaClass() { - return MapData.class; - } - @Override @SuppressWarnings("unchecked") public void nonNullWrite(int rowId, MapData data, ColumnVector output) { @@ -323,11 +278,6 @@ static class RowDataWriter extends GenericOrcWriters.StructWriter { } } - @Override - public Class getJavaClass() { - return RowData.class; - } - @Override protected Object get(RowData struct, int index) { return fieldGetters.get(index).getFieldOrNull(struct); diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java index b6030abb7a78..d8c27ac30879 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java @@ -25,8 +25,6 @@ public interface OrcValueWriter { - Class getJavaClass(); - /** * Take a value from the data value and add it to the ORC output. * diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index a56977ffdfdf..abb12dffc050 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -57,12 +57,12 @@ static OrcValueWriter decimal(int precision, int scale) { } static OrcValueWriter list(OrcValueWriter element, List orcType) { - return new ListWriter(element, orcType); + return new ListWriter<>(element, orcType); } static OrcValueWriter map(OrcValueWriter keyWriter, OrcValueWriter valueWriter, - List orcType) { - return new MapWriter(keyWriter, valueWriter, orcType); + List orcTypes) { + return new MapWriter<>(keyWriter, valueWriter, orcTypes); } private static class StringWriter implements OrcValueWriter { @@ -74,10 +74,6 @@ public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) { ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); } - @Override - public Class getJavaClass() { - return UTF8String.class; - } } private static class TimestampTzWriter implements OrcValueWriter { @@ -90,10 +86,6 @@ public void nonNullWrite(int rowId, Long micros, ColumnVector output) { cv.nanos[rowId] = (int) Math.floorMod(micros, 1_000_000) * 1_000; // nanos } - @Override - public Class getJavaClass() { - return Long.class; - } } private static class Decimal18Writer implements OrcValueWriter { @@ -109,10 +101,6 @@ public void nonNullWrite(int rowId, Decimal decimal, ColumnVector output) { decimal.toUnscaledLong(), scale); } - @Override - public Class getJavaClass() { - return Decimal.class; - } } private static class Decimal38Writer implements OrcValueWriter { @@ -123,22 +111,19 @@ public void nonNullWrite(int rowId, Decimal decimal, ColumnVector output) { HiveDecimal.create(decimal.toJavaBigDecimal())); } - @Override - public Class getJavaClass() { - return Decimal.class; - } } - private static class ListWriter implements OrcValueWriter { - private final OrcValueWriter writer; - private final SparkOrcWriter.FieldGetter fieldGetter; + private static class ListWriter implements OrcValueWriter { + private final OrcValueWriter writer; + private final SparkOrcWriter.FieldGetter fieldGetter; - ListWriter(OrcValueWriter writer, List orcTypes) { + @SuppressWarnings("unchecked") + ListWriter(OrcValueWriter writer, List orcTypes) { if (orcTypes.size() != 1) { throw new IllegalArgumentException("Expected one (and same) ORC type for list elements, got: " + orcTypes); } this.writer = writer; - this.fieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(0)); + this.fieldGetter = (SparkOrcWriter.FieldGetter) SparkOrcWriter.createFieldGetter(orcTypes.get(0)); } @Override @@ -161,26 +146,23 @@ public Stream> metrics() { return writer.metrics(); } - @Override - public Class getJavaClass() { - return ArrayData.class; - } } - private static class MapWriter implements OrcValueWriter { - private final OrcValueWriter keyWriter; - private final OrcValueWriter valueWriter; - private final SparkOrcWriter.FieldGetter keyFieldGetter; - private final SparkOrcWriter.FieldGetter valueFieldGetter; + private static class MapWriter implements OrcValueWriter { + private final OrcValueWriter keyWriter; + private final OrcValueWriter valueWriter; + private final SparkOrcWriter.FieldGetter keyFieldGetter; + private final SparkOrcWriter.FieldGetter valueFieldGetter; - MapWriter(OrcValueWriter keyWriter, OrcValueWriter valueWriter, List orcTypes) { + @SuppressWarnings("unchecked") + MapWriter(OrcValueWriter keyWriter, OrcValueWriter valueWriter, List orcTypes) { if (orcTypes.size() != 2) { throw new IllegalArgumentException("Expected two ORC type descriptions for a map, got: " + orcTypes); } this.keyWriter = keyWriter; this.valueWriter = valueWriter; - this.keyFieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(0)); - this.valueFieldGetter = SparkOrcWriter.createFieldGetter(orcTypes.get(1)); + this.keyFieldGetter = (SparkOrcWriter.FieldGetter) SparkOrcWriter.createFieldGetter(orcTypes.get(0)); + this.valueFieldGetter = (SparkOrcWriter.FieldGetter) SparkOrcWriter.createFieldGetter(orcTypes.get(1)); } @Override @@ -208,10 +190,6 @@ public Stream> metrics() { return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); } - @Override - public Class getJavaClass() { - return MapData.class; - } } private static void growColumnVector(ColumnVector cv, int requestedSize) { diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index 2d35e3b0688c..34292f23b135 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -57,7 +57,7 @@ public SparkOrcWriter(Schema iSchema, TypeDescription orcSchema) { @Override public void write(InternalRow value, VectorizedRowBatch output) { Preconditions.checkArgument(value != null, "value must not be null"); - writer.rootNonNullWrite(value, output); + writer.writeRow(value, output); } @Override @@ -134,16 +134,6 @@ private static class InternalRowWriter extends GenericOrcWriters.StructWriter getJavaClass() { - return InternalRow.class; - } - - @Override - public Stream> metrics() { - return writers().stream().flatMap(OrcValueWriter::metrics); - } - @Override protected Object get(InternalRow struct, int index) { return fieldGetters.get(index).getFieldOrNull(struct, index);