diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java new file mode 100644 index 000000000000..b59db22fd3fd --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mr.hive.serde.objectinspector.WriteObjectInspector; +import org.apache.iceberg.schema.SchemaWithPartnerVisitor; +import org.apache.iceberg.types.Type.PrimitiveType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; + + +class Deserializer { + private FieldDeserializer fieldDeserializer; + + /** + * Builder to create a Deserializer instance. + * Requires an Iceberg Schema and the Hive ObjectInspector for converting the data. + */ + static class Builder { + private Schema schema; + private ObjectInspector inspector; + + Builder schema(Schema mainSchema) { + this.schema = mainSchema; + return this; + } + + Builder inspector(ObjectInspector mainInspector) { + this.inspector = mainInspector; + return this; + } + + Deserializer build() { + return new Deserializer(schema, inspector); + } + } + + /** + * Deserializes the Hive result object to an Iceberg record using the provided ObjectInspectors. + * @param data The Hive data to deserialize + * @return The resulting Iceberg Record + */ + Record deserialize(Object data) { + return (Record) fieldDeserializer.value(data); + } + + private Deserializer(Schema schema, ObjectInspector fieldInspector) { + this.fieldDeserializer = DeserializerVisitor.visit(schema, fieldInspector); + } + + private static class DeserializerVisitor extends SchemaWithPartnerVisitor { + + public static FieldDeserializer visit(Schema schema, ObjectInspector objectInspector) { + return visit(schema, new FixNameMappingObjectInspector(schema, objectInspector), new DeserializerVisitor(), + new PartnerObjectInspectorByNameAccessors()); + } + + @Override + public FieldDeserializer schema(Schema schema, ObjectInspector inspector, FieldDeserializer deserializer) { + return deserializer; + } + + @Override + public FieldDeserializer field(NestedField field, ObjectInspector inspector, FieldDeserializer deserializer) { + return deserializer; + } + + @Override + public FieldDeserializer primitive(PrimitiveType type, ObjectInspector inspector) { + switch (type.typeId()) { + case BOOLEAN: + case INTEGER: + case LONG: + case FLOAT: + case DOUBLE: + case STRING: + // Generic conversions where Iceberg and Hive are using the same java object + return o -> ((PrimitiveObjectInspector) inspector).getPrimitiveJavaObject(o); + case UUID: + // TODO: This will not work with Parquet. Parquet UUID expect byte[], others are expecting UUID + return o -> UUID.fromString(((StringObjectInspector) inspector).getPrimitiveJavaObject(o)); + case DATE: + case TIMESTAMP: + case FIXED: + case BINARY: + case DECIMAL: + // Iceberg specific conversions + return o -> ((WriteObjectInspector) inspector).convert(o); + case TIME: + default: + throw new IllegalArgumentException("Unsupported column type: " + type); + } + } + + @Override + public FieldDeserializer struct(StructType type, ObjectInspector inspector, List deserializers) { + return o -> { + if (o == null) { + return null; + } + + List data = ((StructObjectInspector) inspector).getStructFieldsDataAsList(o); + Record result = GenericRecord.create(type); + + for (int i = 0; i < deserializers.size(); i++) { + Object fieldValue = data.get(i); + if (fieldValue != null) { + result.set(i, deserializers.get(i).value(fieldValue)); + } else { + result.set(i, null); + } + } + + return result; + }; + } + + @Override + public FieldDeserializer list(ListType listTypeInfo, ObjectInspector inspector, FieldDeserializer deserializer) { + return o -> { + if (o == null) { + return null; + } + + List result = new ArrayList<>(); + ListObjectInspector listInspector = (ListObjectInspector) inspector; + + for (Object val : listInspector.getList(o)) { + result.add(deserializer.value(val)); + } + + return result; + }; + } + + @Override + public FieldDeserializer map(MapType mapType, ObjectInspector inspector, FieldDeserializer keyDeserializer, + FieldDeserializer valueDeserializer) { + return o -> { + if (o == null) { + return null; + } + + Map result = new HashMap<>(); + MapObjectInspector mapObjectInspector = (MapObjectInspector) inspector; + + for (Map.Entry entry : mapObjectInspector.getMap(o).entrySet()) { + result.put(keyDeserializer.value(entry.getKey()), valueDeserializer.value(entry.getValue())); + } + return result; + }; + } + } + + private static class PartnerObjectInspectorByNameAccessors + implements SchemaWithPartnerVisitor.PartnerAccessors { + + @Override + public ObjectInspector fieldPartner(ObjectInspector inspector, int fieldId, String name) { + StructObjectInspector fieldInspector = (StructObjectInspector) inspector; + return fieldInspector.getStructFieldRef(name).getFieldObjectInspector(); + } + + @Override + public ObjectInspector mapKeyPartner(ObjectInspector inspector) { + MapObjectInspector fieldInspector = (MapObjectInspector) inspector; + return fieldInspector.getMapKeyObjectInspector(); + } + + @Override + public ObjectInspector mapValuePartner(ObjectInspector inspector) { + MapObjectInspector fieldInspector = (MapObjectInspector) inspector; + return fieldInspector.getMapValueObjectInspector(); + } + + @Override + public ObjectInspector listElementPartner(ObjectInspector inspector) { + ListObjectInspector fieldInspector = (ListObjectInspector) inspector; + return fieldInspector.getListElementObjectInspector(); + } + } + + private interface FieldDeserializer { + Object value(Object object); + } + + /** + * Hive query results schema column names do not match the target Iceberg column names. + * Instead we have to rely on the column order. To keep the other parts of the code generic we fix this with a + * wrapper around the ObjectInspector. This wrapper uses the Iceberg schema column names instead of the Hive column + * names for {@link #getStructFieldRef(String) getStructFieldRef} + */ + private static class FixNameMappingObjectInspector extends StructObjectInspector { + private final StructObjectInspector innerInspector; + private final Map nameMap; + + private FixNameMappingObjectInspector(Schema schema, ObjectInspector inspector) { + this.nameMap = new HashMap<>(schema.columns().size()); + this.innerInspector = (StructObjectInspector) inspector; + List fields = innerInspector.getAllStructFieldRefs(); + + for (int i = 0; i < schema.columns().size(); ++i) { + nameMap.put(schema.columns().get(i).name(), fields.get(i)); + } + } + + @Override + public List getAllStructFieldRefs() { + return innerInspector.getAllStructFieldRefs(); + } + + @Override + public StructField getStructFieldRef(String fieldName) { + return nameMap.get(fieldName); + } + + @Override + public Object getStructFieldData(Object data, StructField fieldRef) { + return innerInspector.getStructFieldData(data, fieldRef); + } + + @Override + public List getStructFieldsDataAsList(Object data) { + return innerInspector.getStructFieldsDataAsList(data); + } + + @Override + public String getTypeName() { + return innerInspector.getTypeName(); + } + + @Override + public Category getCategory() { + return innerInspector.getCategory(); + } + } +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java index c4e4a2fd848c..de6a26f4a469 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java @@ -28,13 +28,18 @@ import org.apache.iceberg.util.ByteBuffers; public abstract class IcebergBinaryObjectInspector extends AbstractPrimitiveJavaObjectInspector - implements BinaryObjectInspector { + implements BinaryObjectInspector, WriteObjectInspector { private static final IcebergBinaryObjectInspector BYTE_ARRAY = new IcebergBinaryObjectInspector() { @Override byte[] toByteArray(Object o) { return (byte[]) o; } + + @Override + public byte[] convert(Object o) { + return o == null ? null : ((BytesWritable) o).getBytes(); + } }; private static final IcebergBinaryObjectInspector BYTE_BUFFER = new IcebergBinaryObjectInspector() { @@ -42,6 +47,11 @@ byte[] toByteArray(Object o) { byte[] toByteArray(Object o) { return ByteBuffers.toByteArray((ByteBuffer) o); } + + @Override + public ByteBuffer convert(Object o) { + return o == null ? null : ByteBuffer.wrap(((BytesWritable) o).getBytes()); + } }; public static IcebergBinaryObjectInspector byteArray() { diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java index 726713d4e7bd..8959780b078d 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java @@ -28,7 +28,7 @@ import org.apache.iceberg.util.DateTimeUtil; public final class IcebergDateObjectInspector extends AbstractPrimitiveJavaObjectInspector - implements DateObjectInspector { + implements DateObjectInspector, WriteObjectInspector { private static final IcebergDateObjectInspector INSTANCE = new IcebergDateObjectInspector(); @@ -65,4 +65,8 @@ public Object copyObject(Object o) { } } + @Override + public LocalDate convert(Object o) { + return o == null ? null : ((DateWritable) o).get().toLocalDate(); + } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java index aed3fbe760b5..fa023d289004 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java @@ -31,7 +31,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public final class IcebergDecimalObjectInspector extends AbstractPrimitiveJavaObjectInspector - implements HiveDecimalObjectInspector { + implements HiveDecimalObjectInspector, WriteObjectInspector { private static final Cache CACHE = Caffeine.newBuilder() .expireAfterAccess(10, TimeUnit.MINUTES) @@ -78,4 +78,8 @@ public Object copyObject(Object o) { } } + @Override + public BigDecimal convert(Object o) { + return o == null ? null : ((HiveDecimalWritable) o).getHiveDecimal().bigDecimalValue(); + } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java index ac8bffc0c84d..f7a0382af6ec 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java @@ -41,7 +41,7 @@ public final class IcebergObjectInspector extends TypeUtil.SchemaVisitor valuesForTestRecord(Record record) { +// ByteBuffer byteBuffer = record.get(11, ByteBuffer.class); +// byte[] bytes = new byte[byteBuffer.remaining()]; +// byteBuffer.mark(); +// byteBuffer.get(bytes); +// byteBuffer.reset(); + + return Arrays.asList( + new BooleanWritable(Boolean.TRUE), + new IntWritable(record.get(1, Integer.class)), + new LongWritable(record.get(2, Long.class)), + new FloatWritable(record.get(3, Float.class)), + new DoubleWritable(record.get(4, Double.class)), + new DateWritable((int) record.get(5, LocalDate.class).toEpochDay()), + // TimeType is not supported + // new Timestamp() + new TimestampWritable(Timestamp.from(record.get(6, OffsetDateTime.class).toInstant())), + new TimestampWritable(Timestamp.valueOf(record.get(7, LocalDateTime.class))), + new Text(record.get(8, String.class)), + new Text(record.get(9, UUID.class).toString()), + new BytesWritable(record.get(10, byte[].class)), + new BytesWritable(ByteBuffers.toByteArray(record.get(11, ByteBuffer.class))), + new HiveDecimalWritable(HiveDecimal.create(record.get(12, BigDecimal.class))) + ); + } + + public static void assertEquals(Record expected, Record actual) { + for (int i = 0; i < expected.size(); ++i) { + if (expected.get(i) instanceof OffsetDateTime) { + // For OffsetDateTime we just compare the actual instant + Assert.assertEquals(((OffsetDateTime) expected.get(i)).toInstant(), + ((OffsetDateTime) actual.get(i)).toInstant()); + } else { + if (expected.get(i) instanceof byte[]) { + Assert.assertArrayEquals((byte[]) expected.get(i), (byte[]) actual.get(i)); + } else { + Assert.assertEquals(expected.get(i), actual.get(i)); + } + } + } + } +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java new file mode 100644 index 000000000000..76f7cf52698b --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.Arrays; +import java.util.Collections; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hive.MetastoreUtil; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +public class TestDeserializer { + private static final Schema CUSTOMER_SCHEMA = new Schema( + optional(1, "customer_id", Types.LongType.get()), + optional(2, "first_name", Types.StringType.get()) + ); + + private static final StandardStructObjectInspector CUSTOMER_OBJECT_INSPECTOR = + ObjectInspectorFactory.getStandardStructObjectInspector( + Arrays.asList("customer_id", "first_name"), + Arrays.asList( + PrimitiveObjectInspectorFactory.writableLongObjectInspector, + PrimitiveObjectInspectorFactory.writableStringObjectInspector + )); + + @Test + public void testSchemaDeserialize() { + StandardStructObjectInspector schemaObjectInspector = + ObjectInspectorFactory.getStandardStructObjectInspector( + Arrays.asList("0:col1", "1:col2"), + Arrays.asList( + PrimitiveObjectInspectorFactory.writableLongObjectInspector, + PrimitiveObjectInspectorFactory.writableStringObjectInspector + )); + + Deserializer deserializer = new Deserializer.Builder() + .schema(CUSTOMER_SCHEMA) + .inspector(schemaObjectInspector) + .build(); + + Record expected = GenericRecord.create(CUSTOMER_SCHEMA); + expected.set(0, 1L); + expected.set(1, "Bob"); + + Record actual = deserializer.deserialize(new Object[] { new LongWritable(1L), new Text("Bob") }); + + Assert.assertEquals(expected, actual); + } + + @Test + public void testStructDeserialize() { + Deserializer deserializer = new Deserializer.Builder() + .schema(CUSTOMER_SCHEMA) + .inspector(CUSTOMER_OBJECT_INSPECTOR) + .build(); + + Record expected = GenericRecord.create(CUSTOMER_SCHEMA); + expected.set(0, 1L); + expected.set(1, "Bob"); + + Record actual = deserializer.deserialize(new Object[] { new LongWritable(1L), new Text("Bob") }); + + Assert.assertEquals(expected, actual); + } + + @Test + public void testMapDeserialize() { + Schema schema = new Schema( + optional(1, "map_type", Types.MapType.ofOptional(2, 3, + Types.LongType.get(), + Types.StringType.get() + )) + ); + + ObjectInspector inspector = ObjectInspectorFactory.getStandardStructObjectInspector( + Arrays.asList("map_type"), + Arrays.asList( + ObjectInspectorFactory.getStandardMapObjectInspector( + PrimitiveObjectInspectorFactory.writableLongObjectInspector, + PrimitiveObjectInspectorFactory.writableStringObjectInspector + ) + )); + + Deserializer deserializer = new Deserializer.Builder() + .schema(schema) + .inspector(inspector) + .build(); + + Record expected = GenericRecord.create(schema); + expected.set(0, Collections.singletonMap(1L, "Taylor")); + + MapWritable map = new MapWritable(); + map.put(new LongWritable(1L), new Text("Taylor")); + Object[] data = new Object[] { map }; + Record actual = deserializer.deserialize(data); + + Assert.assertEquals(expected, actual); + } + + @Test + public void testListDeserialize() { + Schema schema = new Schema( + optional(1, "list_type", Types.ListType.ofOptional(2, Types.LongType.get())) + ); + + ObjectInspector inspector = ObjectInspectorFactory.getStandardStructObjectInspector( + Arrays.asList("list_type"), + Arrays.asList( + ObjectInspectorFactory.getStandardListObjectInspector( + PrimitiveObjectInspectorFactory.writableLongObjectInspector) + )); + + Deserializer deserializer = new Deserializer.Builder() + .schema(schema) + .inspector(inspector) + .build(); + + Record expected = GenericRecord.create(schema); + expected.set(0, Collections.singletonList(1L)); + + Object[] data = new Object[] { new Object[] { new LongWritable(1L) } }; + Record actual = deserializer.deserialize(data); + + Assert.assertEquals(expected, actual); + } + + @Test + public void testDeserializeEverySupportedType() { + Assume.assumeFalse("No test yet for Hive3 (Date/Timestamp creation)", MetastoreUtil.hive3PresentOnClasspath()); + + Deserializer deserializer = new Deserializer.Builder() + .schema(HiveIcebergTestUtils.FULL_SCHEMA) + .inspector(HiveIcebergTestUtils.FULL_SCHEMA_OBJECT_INSPECTOR) + .build(); + + Record expected = HiveIcebergTestUtils.getTestRecord(false); + Record actual = deserializer.deserialize(HiveIcebergTestUtils.valuesForTestRecord(expected)); + + HiveIcebergTestUtils.assertEquals(expected, actual); + } + + @Test + public void testNullDeserialize() { + Deserializer deserializer = new Deserializer.Builder() + .schema(HiveIcebergTestUtils.FULL_SCHEMA) + .inspector(HiveIcebergTestUtils.FULL_SCHEMA_OBJECT_INSPECTOR) + .build(); + + Record expected = HiveIcebergTestUtils.getNullTestRecord(); + + Object[] nulls = new Object[HiveIcebergTestUtils.FULL_SCHEMA.columns().size()]; + Arrays.fill(nulls, null); + + Record actual = deserializer.deserialize(nulls); + + Assert.assertEquals(expected, actual); + + // Check null record as well + Assert.assertNull(deserializer.deserialize(null)); + } + + @Test + public void testUnsupportedType() { + Schema unsupported = new Schema( + optional(1, "time_type", Types.TimeType.get()) + ); + StandardStructObjectInspector objectInspector = ObjectInspectorFactory.getStandardStructObjectInspector( + Arrays.asList("time_type"), + Arrays.asList( + PrimitiveObjectInspectorFactory.writableStringObjectInspector + )); + + AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, + "Unsupported column type", () -> { + new Deserializer.Builder() + .schema(unsupported) + .inspector(objectInspector) + .build(); + } + ); + } +}