From 2a85e723c2043c57c93a6e084c76935fe9dc740d Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Tue, 1 Dec 2020 16:58:51 +0100 Subject: [PATCH 1/6] Hive: Implement Deserializer for Hive writes --- .../apache/iceberg/mr/hive/Deserializer.java | 127 ++++++++++++ .../IcebergBinaryObjectInspector.java | 12 +- .../IcebergDateObjectInspector.java | 6 +- .../IcebergDecimalObjectInspector.java | 6 +- .../IcebergObjectInspector.java | 6 +- .../IcebergTimestampObjectInspector.java | 14 +- .../IcebergWriteObjectInspector.java | 24 +++ .../iceberg/mr/hive/HiveIcebergTestUtils.java | 180 ++++++++++++++++++ .../iceberg/mr/hive/TestDeserializer.java | 99 ++++++++++ 9 files changed, 467 insertions(+), 7 deletions(-) create mode 100644 mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java create mode 100644 mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergWriteObjectInspector.java create mode 100644 mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java create mode 100644 mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java new file mode 100644 index 000000000000..f0ac57dfc8b3 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.List; +import java.util.UUID; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergWriteObjectInspector; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +class Deserializer { + private FiledDeserializer mainDeserializer; + + Deserializer(Schema schema, ObjectInspector fieldInspector) throws SerDeException { + this.mainDeserializer = deserializer(schema.asStruct(), fieldInspector); + } + + Record deserialize(Object data) { + return (Record) mainDeserializer.value(data); + } + + private interface FiledDeserializer { + Object value(Object object); + } + + private static FiledDeserializer deserializer(Type type, ObjectInspector fieldInspector) throws SerDeException { + switch (type.typeId()) { + case BOOLEAN: + return o -> ((BooleanObjectInspector) fieldInspector).get(o); + case INTEGER: + return o -> ((IntObjectInspector) fieldInspector).get(o); + case LONG: + return o -> ((LongObjectInspector) fieldInspector).get(o); + case FLOAT: + return o -> ((FloatObjectInspector) fieldInspector).get(o); + case DOUBLE: + return o -> ((DoubleObjectInspector) fieldInspector).get(o); + case STRING: + return o -> ((StringObjectInspector) fieldInspector).getPrimitiveJavaObject(o); + case UUID: + // TODO: This will not work with Parquet. Parquet UUID expect byte[], others are expecting UUID + return o -> UUID.fromString(((StringObjectInspector) fieldInspector).getPrimitiveJavaObject(o)); + case DATE: + case TIMESTAMP: + case FIXED: + case BINARY: + case DECIMAL: + // Iceberg specific conversions + return o -> ((IcebergWriteObjectInspector) fieldInspector).getIcebergObject(o); + case STRUCT: + return new StructDeserializer((Types.StructType) type, (StructObjectInspector) fieldInspector); + case LIST: + case MAP: + case TIME: + default: + throw new SerDeException("Unsupported column type: " + type); + } + } + + private static class StructDeserializer implements FiledDeserializer { + private final FiledDeserializer[] filedDeserializers; + private final StructObjectInspector fieldInspector; + private final Types.StructType type; + + private StructDeserializer(Types.StructType type, StructObjectInspector fieldInspector) throws SerDeException { + List structFields = fieldInspector.getAllStructFieldRefs(); + List nestedFields = type.fields(); + this.filedDeserializers = new FiledDeserializer[structFields.size()]; + this.fieldInspector = fieldInspector; + this.type = type; + + for (int i = 0; i < filedDeserializers.length; i++) { + filedDeserializers[i] = + deserializer(nestedFields.get(i).type(), structFields.get(i).getFieldObjectInspector()); + } + } + + @Override + public Record value(Object object) { + if (object == null) { + return null; + } + + List data = fieldInspector.getStructFieldsDataAsList(object); + Record result = GenericRecord.create(type); + + for (int i = 0; i < filedDeserializers.length; i++) { + Object fieldValue = data.get(i); + if (fieldValue != null) { + result.set(i, filedDeserializers[i].value(fieldValue)); + } + } + + return result; + } + } +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java index c4e4a2fd848c..60f89eb11bc9 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java @@ -28,13 +28,18 @@ import org.apache.iceberg.util.ByteBuffers; public abstract class IcebergBinaryObjectInspector extends AbstractPrimitiveJavaObjectInspector - implements BinaryObjectInspector { + implements BinaryObjectInspector, IcebergWriteObjectInspector { private static final IcebergBinaryObjectInspector BYTE_ARRAY = new IcebergBinaryObjectInspector() { @Override byte[] toByteArray(Object o) { return (byte[]) o; } + + @Override + public byte[] getIcebergObject(Object o) { + return o == null ? null : ((BytesWritable) o).getBytes(); + } }; private static final IcebergBinaryObjectInspector BYTE_BUFFER = new IcebergBinaryObjectInspector() { @@ -42,6 +47,11 @@ byte[] toByteArray(Object o) { byte[] toByteArray(Object o) { return ByteBuffers.toByteArray((ByteBuffer) o); } + + @Override + public ByteBuffer getIcebergObject(Object o) { + return o == null ? null : ByteBuffer.wrap(((BytesWritable) o).getBytes()); + } }; public static IcebergBinaryObjectInspector byteArray() { diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java index 726713d4e7bd..3a3e51b5131d 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java @@ -28,7 +28,7 @@ import org.apache.iceberg.util.DateTimeUtil; public final class IcebergDateObjectInspector extends AbstractPrimitiveJavaObjectInspector - implements DateObjectInspector { + implements DateObjectInspector, IcebergWriteObjectInspector { private static final IcebergDateObjectInspector INSTANCE = new IcebergDateObjectInspector(); @@ -65,4 +65,8 @@ public Object copyObject(Object o) { } } + @Override + public LocalDate getIcebergObject(Object o) { + return o == null ? null : ((DateWritable) o).get().toLocalDate(); + } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java index aed3fbe760b5..47b4e156fa68 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java @@ -31,7 +31,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public final class IcebergDecimalObjectInspector extends AbstractPrimitiveJavaObjectInspector - implements HiveDecimalObjectInspector { + implements HiveDecimalObjectInspector, IcebergWriteObjectInspector { private static final Cache CACHE = Caffeine.newBuilder() .expireAfterAccess(10, TimeUnit.MINUTES) @@ -78,4 +78,8 @@ public Object copyObject(Object o) { } } + @Override + public BigDecimal getIcebergObject(Object o) { + return o == null ? null : ((HiveDecimalWritable) o).getHiveDecimal().bigDecimalValue(); + } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java index ac8bffc0c84d..f7a0382af6ec 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java @@ -41,7 +41,7 @@ public final class IcebergObjectInspector extends TypeUtil.SchemaVisitor valuesForTestRecord(Record record) { + ByteBuffer byteBuffer = record.get(11, ByteBuffer.class); + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.mark(); + byteBuffer.get(bytes); + byteBuffer.reset(); + + return Arrays.asList( + new BooleanWritable(Boolean.TRUE), + new IntWritable(record.get(1, Integer.class)), + new LongWritable(record.get(2, Long.class)), + new FloatWritable(record.get(3, Float.class)), + new DoubleWritable(record.get(4, Double.class)), + new DateWritable((int) record.get(5, LocalDate.class).toEpochDay()), + // TimeType is not supported + // new Timestamp() + new TimestampWritable(Timestamp.from(record.get(6, OffsetDateTime.class).toInstant())), + new TimestampWritable(Timestamp.valueOf(record.get(7, LocalDateTime.class))), + new Text(record.get(8, String.class)), + new Text(record.get(9, UUID.class).toString()), + new BytesWritable(record.get(10, byte[].class)), + new BytesWritable(bytes), + new HiveDecimalWritable(HiveDecimal.create(record.get(12, BigDecimal.class))) + ); + } + + public static void assertEquals(Record expected, Record actual) { + for (int i = 0; i < expected.size(); ++i) { + if (expected.get(i) instanceof OffsetDateTime) { + // For OffsetDateTime we just compare the actual instant + Assert.assertEquals(((OffsetDateTime) expected.get(i)).toInstant(), + ((OffsetDateTime) actual.get(i)).toInstant()); + } else { + if (expected.get(i) instanceof byte[]) { + Assert.assertArrayEquals((byte[]) expected.get(i), (byte[]) actual.get(i)); + } else { + Assert.assertEquals(expected.get(i), actual.get(i)); + } + } + } + } +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java new file mode 100644 index 000000000000..4871c3610602 --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.Arrays; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hive.MetastoreUtil; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +public class TestDeserializer { + private static final Schema CUSTOMER_SCHEMA = new Schema( + optional(1, "customer_id", Types.LongType.get()), + optional(2, "first_name", Types.StringType.get()) + ); + + private static final StandardStructObjectInspector CUSTOMER_OBJECT_INSPECTOR = + ObjectInspectorFactory.getStandardStructObjectInspector( + Arrays.asList("customer_id", "first_name"), + Arrays.asList( + PrimitiveObjectInspectorFactory.writableLongObjectInspector, + PrimitiveObjectInspectorFactory.writableStringObjectInspector + )); + + + @Test + public void testSimpleDeserialize() throws SerDeException { + Deserializer deserializer = new Deserializer(CUSTOMER_SCHEMA, CUSTOMER_OBJECT_INSPECTOR); + + Record expected = GenericRecord.create(CUSTOMER_SCHEMA); + expected.set(0, 1L); + expected.set(1, "Bob"); + + Record actual = deserializer.deserialize(new Object[] { new LongWritable(1L), new Text("Bob") }); + + Assert.assertEquals(expected, actual); + } + + @Test + public void testDeserializeEverySupportedType() throws SerDeException { + // No test yet for Hive3 (Date/Timestamp creation) + Assume.assumeFalse(MetastoreUtil.hive3PresentOnClasspath()); + + Deserializer deserializer = new Deserializer(HiveIcebergTestUtils.FULL_SCHEMA, + HiveIcebergTestUtils.FULL_SCHEMA_OBJECT_INSPECTOR); + + Record expected = HiveIcebergTestUtils.getTestRecord(false); + Record actual = deserializer.deserialize(HiveIcebergTestUtils.valuesForTestRecord(expected)); + + HiveIcebergTestUtils.assertEquals(expected, actual); + } + + @Test + public void testNullDeserialize() throws SerDeException { + Deserializer deserializer = new Deserializer(HiveIcebergTestUtils.FULL_SCHEMA, + HiveIcebergTestUtils.FULL_SCHEMA_OBJECT_INSPECTOR); + + Record expected = HiveIcebergTestUtils.getNullTestRecord(); + + Object[] nulls = new Object[HiveIcebergTestUtils.FULL_SCHEMA.columns().size()]; + Arrays.fill(nulls, null); + + Record actual = deserializer.deserialize(nulls); + + Assert.assertEquals(expected, actual); + + // Check null record as well + Assert.assertNull(deserializer.deserialize(null)); + } +} From ac3f608c6de396ae0cf4b84a4a4570c00d9f2b60 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Wed, 2 Dec 2020 17:52:41 +0100 Subject: [PATCH 2/6] Addressed Marton's comments --- .../apache/iceberg/mr/hive/Deserializer.java | 26 ++++++++++--------- .../IcebergTimestampObjectInspector.java | 2 +- .../iceberg/mr/hive/TestDeserializer.java | 16 ++++++++++++ 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java index f0ac57dfc8b3..cd90f3434521 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java @@ -39,21 +39,21 @@ import org.apache.iceberg.types.Types; class Deserializer { - private FiledDeserializer mainDeserializer; + private FieldDeserializer fieldDeserializer; Deserializer(Schema schema, ObjectInspector fieldInspector) throws SerDeException { - this.mainDeserializer = deserializer(schema.asStruct(), fieldInspector); + this.fieldDeserializer = deserializer(schema.asStruct(), fieldInspector); } Record deserialize(Object data) { - return (Record) mainDeserializer.value(data); + return (Record) fieldDeserializer.value(data); } - private interface FiledDeserializer { + private interface FieldDeserializer { Object value(Object object); } - private static FiledDeserializer deserializer(Type type, ObjectInspector fieldInspector) throws SerDeException { + private static FieldDeserializer deserializer(Type type, ObjectInspector fieldInspector) throws SerDeException { switch (type.typeId()) { case BOOLEAN: return o -> ((BooleanObjectInspector) fieldInspector).get(o); @@ -87,20 +87,20 @@ private static FiledDeserializer deserializer(Type type, ObjectInspector fieldIn } } - private static class StructDeserializer implements FiledDeserializer { - private final FiledDeserializer[] filedDeserializers; + private static class StructDeserializer implements FieldDeserializer { + private final FieldDeserializer[] fieldDeserializers; private final StructObjectInspector fieldInspector; private final Types.StructType type; private StructDeserializer(Types.StructType type, StructObjectInspector fieldInspector) throws SerDeException { List structFields = fieldInspector.getAllStructFieldRefs(); List nestedFields = type.fields(); - this.filedDeserializers = new FiledDeserializer[structFields.size()]; + this.fieldDeserializers = new FieldDeserializer[structFields.size()]; this.fieldInspector = fieldInspector; this.type = type; - for (int i = 0; i < filedDeserializers.length; i++) { - filedDeserializers[i] = + for (int i = 0; i < fieldDeserializers.length; i++) { + fieldDeserializers[i] = deserializer(nestedFields.get(i).type(), structFields.get(i).getFieldObjectInspector()); } } @@ -114,10 +114,12 @@ public Record value(Object object) { List data = fieldInspector.getStructFieldsDataAsList(object); Record result = GenericRecord.create(type); - for (int i = 0; i < filedDeserializers.length; i++) { + for (int i = 0; i < fieldDeserializers.length; i++) { Object fieldValue = data.get(i); if (fieldValue != null) { - result.set(i, filedDeserializers[i].value(fieldValue)); + result.set(i, fieldDeserializers[i].value(fieldValue)); + } else { + result.set(i, null); } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java index 98a66b1cd06f..b3102d9f5629 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java @@ -38,7 +38,7 @@ LocalDateTime toLocalDateTime(Object o) { } @Override - public Object getIcebergObject(Object o) { + public OffsetDateTime getIcebergObject(Object o) { return o == null ? null : OffsetDateTime.ofInstant(((TimestampWritable) o).getTimestamp().toInstant(), ZoneId.of("UTC")); } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java index 4871c3610602..432a6b91f613 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java @@ -26,9 +26,11 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hive.MetastoreUtil; import org.apache.iceberg.types.Types; import org.junit.Assert; @@ -96,4 +98,18 @@ public void testNullDeserialize() throws SerDeException { // Check null record as well Assert.assertNull(deserializer.deserialize(null)); } + + @Test(expected = SerDeException.class) + public void testSerDeException() throws SerDeException { + Schema unsupported = new Schema( + optional(1, "time_type", Types.TimeType.get()) + ); + StandardStructObjectInspector objectInspector = ObjectInspectorFactory.getStandardStructObjectInspector( + Arrays.asList("time_type"), + Arrays.asList( + PrimitiveObjectInspectorFactory.writableStringObjectInspector + )); + + new Deserializer(unsupported, objectInspector); + } } From 2eea35667e1e157525808a8248302b670a6b2884 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Wed, 2 Dec 2020 20:23:16 +0100 Subject: [PATCH 3/6] Checkstyle --- .../test/java/org/apache/iceberg/mr/hive/TestDeserializer.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java index 432a6b91f613..98ada8a50af7 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java @@ -26,11 +26,9 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hive.MetastoreUtil; import org.apache.iceberg.types.Types; import org.junit.Assert; From 294abb0921525db40b07826a0fd98b46b7177383 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Sun, 6 Dec 2020 20:43:02 +0100 Subject: [PATCH 4/6] Using the Visitor for going through the Schema --- .../apache/iceberg/mr/hive/Deserializer.java | 228 ++++++++++++------ .../iceberg/mr/hive/HiveIcebergTestUtils.java | 13 +- .../iceberg/mr/hive/TestDeserializer.java | 106 ++++++-- 3 files changed, 252 insertions(+), 95 deletions(-) diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java index cd90f3434521..a2a4cf16c94c 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java @@ -19,111 +19,191 @@ package org.apache.iceberg.mr.hive; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; -import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergWriteObjectInspector; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; +import org.apache.iceberg.schema.SchemaWithPartnerVisitor; +import org.apache.iceberg.types.Type.PrimitiveType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; + class Deserializer { private FieldDeserializer fieldDeserializer; - Deserializer(Schema schema, ObjectInspector fieldInspector) throws SerDeException { - this.fieldDeserializer = deserializer(schema.asStruct(), fieldInspector); + static class Builder { + private Schema schema; + private ObjectInspector inspector; + + Builder schema(Schema mainSchema) { + this.schema = mainSchema; + return this; + } + + Builder inspector(ObjectInspector mainInspector) { + this.inspector = mainInspector; + return this; + } + + Deserializer build() { + return new Deserializer(schema, inspector); + } } Record deserialize(Object data) { return (Record) fieldDeserializer.value(data); } - private interface FieldDeserializer { - Object value(Object object); + private Deserializer(Schema schema, ObjectInspector fieldInspector) { + this.fieldDeserializer = DeserializerVisitor.visit(schema, fieldInspector); } - private static FieldDeserializer deserializer(Type type, ObjectInspector fieldInspector) throws SerDeException { - switch (type.typeId()) { - case BOOLEAN: - return o -> ((BooleanObjectInspector) fieldInspector).get(o); - case INTEGER: - return o -> ((IntObjectInspector) fieldInspector).get(o); - case LONG: - return o -> ((LongObjectInspector) fieldInspector).get(o); - case FLOAT: - return o -> ((FloatObjectInspector) fieldInspector).get(o); - case DOUBLE: - return o -> ((DoubleObjectInspector) fieldInspector).get(o); - case STRING: - return o -> ((StringObjectInspector) fieldInspector).getPrimitiveJavaObject(o); - case UUID: - // TODO: This will not work with Parquet. Parquet UUID expect byte[], others are expecting UUID - return o -> UUID.fromString(((StringObjectInspector) fieldInspector).getPrimitiveJavaObject(o)); - case DATE: - case TIMESTAMP: - case FIXED: - case BINARY: - case DECIMAL: - // Iceberg specific conversions - return o -> ((IcebergWriteObjectInspector) fieldInspector).getIcebergObject(o); - case STRUCT: - return new StructDeserializer((Types.StructType) type, (StructObjectInspector) fieldInspector); - case LIST: - case MAP: - case TIME: - default: - throw new SerDeException("Unsupported column type: " + type); + private static class DeserializerVisitor extends SchemaWithPartnerVisitor { + + public static FieldDeserializer visit(Schema schema, ObjectInspector objectInspector) { + return visit(schema, objectInspector, new DeserializerVisitor(), new PartnerObjectInspectorByNameAccessors()); } - } - private static class StructDeserializer implements FieldDeserializer { - private final FieldDeserializer[] fieldDeserializers; - private final StructObjectInspector fieldInspector; - private final Types.StructType type; - - private StructDeserializer(Types.StructType type, StructObjectInspector fieldInspector) throws SerDeException { - List structFields = fieldInspector.getAllStructFieldRefs(); - List nestedFields = type.fields(); - this.fieldDeserializers = new FieldDeserializer[structFields.size()]; - this.fieldInspector = fieldInspector; - this.type = type; - - for (int i = 0; i < fieldDeserializers.length; i++) { - fieldDeserializers[i] = - deserializer(nestedFields.get(i).type(), structFields.get(i).getFieldObjectInspector()); - } + @Override + public FieldDeserializer schema(Schema schema, ObjectInspector inspector, FieldDeserializer deserializer) { + return deserializer; } @Override - public Record value(Object object) { - if (object == null) { - return null; + public FieldDeserializer field(NestedField field, ObjectInspector inspector, FieldDeserializer deserializer) { + return deserializer; + } + + @Override + public FieldDeserializer primitive(PrimitiveType type, ObjectInspector inspector) { + switch (type.typeId()) { + case BOOLEAN: + case INTEGER: + case LONG: + case FLOAT: + case DOUBLE: + case STRING: + // Generic conversions where Iceberg and Hive are using the same java object + return o -> ((PrimitiveObjectInspector) inspector).getPrimitiveJavaObject(o); + case UUID: + // TODO: This will not work with Parquet. Parquet UUID expect byte[], others are expecting UUID + return o -> UUID.fromString(((StringObjectInspector) inspector).getPrimitiveJavaObject(o)); + case DATE: + case TIMESTAMP: + case FIXED: + case BINARY: + case DECIMAL: + // Iceberg specific conversions + return o -> ((IcebergWriteObjectInspector) inspector).getIcebergObject(o); + case TIME: + default: + throw new IllegalArgumentException("Unsupported column type: " + type); } + } - List data = fieldInspector.getStructFieldsDataAsList(object); - Record result = GenericRecord.create(type); + @Override + public FieldDeserializer struct(StructType type, ObjectInspector inspector, List deserializers) { + return o -> { + if (o == null) { + return null; + } + + List data = ((StructObjectInspector) inspector).getStructFieldsDataAsList(o); + Record result = GenericRecord.create(type); - for (int i = 0; i < fieldDeserializers.length; i++) { - Object fieldValue = data.get(i); - if (fieldValue != null) { - result.set(i, fieldDeserializers[i].value(fieldValue)); - } else { - result.set(i, null); + for (int i = 0; i < deserializers.size(); i++) { + Object fieldValue = data.get(i); + if (fieldValue != null) { + result.set(i, deserializers.get(i).value(fieldValue)); + } else { + result.set(i, null); + } } - } - return result; + return result; + }; + } + + @Override + public FieldDeserializer list(ListType listTypeInfo, ObjectInspector inspector, FieldDeserializer deserializer) { + return o -> { + if (o == null) { + return null; + } + + List result = new ArrayList<>(); + ListObjectInspector listInspector = (ListObjectInspector) inspector; + + for (Object val : listInspector.getList(o)) { + result.add(deserializer.value(val)); + } + + return result; + }; + } + + @Override + public FieldDeserializer map(MapType mapType, ObjectInspector inspector, FieldDeserializer keyDeserializer, + FieldDeserializer valueDeserializer) { + return o -> { + if (o == null) { + return null; + } + + Map result = new HashMap<>(); + MapObjectInspector mapObjectInspector = (MapObjectInspector) inspector; + + for (Map.Entry entry : mapObjectInspector.getMap(o).entrySet()) { + result.put(keyDeserializer.value(entry.getKey()), valueDeserializer.value(entry.getValue())); + } + return result; + }; + } + } + + private static class PartnerObjectInspectorByNameAccessors + implements SchemaWithPartnerVisitor.PartnerAccessors { + + @Override + public ObjectInspector fieldPartner(ObjectInspector inspector, int fieldId, String name) { + StructObjectInspector fieldInspector = (StructObjectInspector) inspector; + return fieldInspector.getStructFieldRef(name).getFieldObjectInspector(); } + + @Override + public ObjectInspector mapKeyPartner(ObjectInspector inspector) { + MapObjectInspector fieldInspector = (MapObjectInspector) inspector; + return fieldInspector.getMapKeyObjectInspector(); + } + + @Override + public ObjectInspector mapValuePartner(ObjectInspector inspector) { + MapObjectInspector fieldInspector = (MapObjectInspector) inspector; + return fieldInspector.getMapValueObjectInspector(); + } + + @Override + public ObjectInspector listElementPartner(ObjectInspector inspector) { + ListObjectInspector fieldInspector = (ListObjectInspector) inspector; + return fieldInspector.getListElementObjectInspector(); + } + } + + private interface FieldDeserializer { + Object value(Object object); } } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java index df301fc2482a..b04ef10d4e49 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java @@ -50,6 +50,7 @@ import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergDecimalObjectInspector; import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.UUIDUtil; import org.junit.Assert; @@ -137,11 +138,11 @@ public static Record getNullTestRecord() { } public static List valuesForTestRecord(Record record) { - ByteBuffer byteBuffer = record.get(11, ByteBuffer.class); - byte[] bytes = new byte[byteBuffer.remaining()]; - byteBuffer.mark(); - byteBuffer.get(bytes); - byteBuffer.reset(); +// ByteBuffer byteBuffer = record.get(11, ByteBuffer.class); +// byte[] bytes = new byte[byteBuffer.remaining()]; +// byteBuffer.mark(); +// byteBuffer.get(bytes); +// byteBuffer.reset(); return Arrays.asList( new BooleanWritable(Boolean.TRUE), @@ -157,7 +158,7 @@ public static List valuesForTestRecord(Record record) { new Text(record.get(8, String.class)), new Text(record.get(9, UUID.class).toString()), new BytesWritable(record.get(10, byte[].class)), - new BytesWritable(bytes), + new BytesWritable(ByteBuffers.toByteArray(record.get(11, ByteBuffer.class))), new HiveDecimalWritable(HiveDecimal.create(record.get(12, BigDecimal.class))) ); } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java index 98ada8a50af7..aa1b68048c51 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java @@ -20,12 +20,15 @@ package org.apache.iceberg.mr.hive; import java.util.Arrays; -import org.apache.hadoop.hive.serde2.SerDeException; +import java.util.Collections; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; @@ -51,10 +54,12 @@ public class TestDeserializer { PrimitiveObjectInspectorFactory.writableStringObjectInspector )); - @Test - public void testSimpleDeserialize() throws SerDeException { - Deserializer deserializer = new Deserializer(CUSTOMER_SCHEMA, CUSTOMER_OBJECT_INSPECTOR); + public void testStructDeserialize() { + Deserializer deserializer = new Deserializer.Builder() + .schema(CUSTOMER_SCHEMA) + .inspector(CUSTOMER_OBJECT_INSPECTOR) + .build(); Record expected = GenericRecord.create(CUSTOMER_SCHEMA); expected.set(0, 1L); @@ -66,12 +71,74 @@ public void testSimpleDeserialize() throws SerDeException { } @Test - public void testDeserializeEverySupportedType() throws SerDeException { - // No test yet for Hive3 (Date/Timestamp creation) - Assume.assumeFalse(MetastoreUtil.hive3PresentOnClasspath()); + public void testMapDeserialize() { + Schema schema = new Schema( + optional(1, "map_type", Types.MapType.ofOptional(2, 3, + Types.LongType.get(), + Types.StringType.get() + )) + ); + + ObjectInspector inspector = ObjectInspectorFactory.getStandardStructObjectInspector( + Arrays.asList("map_type"), + Arrays.asList( + ObjectInspectorFactory.getStandardMapObjectInspector( + PrimitiveObjectInspectorFactory.writableLongObjectInspector, + PrimitiveObjectInspectorFactory.writableStringObjectInspector + ) + )); + + Deserializer deserializer = new Deserializer.Builder() + .schema(schema) + .inspector(inspector) + .build(); + + Record expected = GenericRecord.create(schema); + expected.set(0, Collections.singletonMap(1L, "Taylor")); - Deserializer deserializer = new Deserializer(HiveIcebergTestUtils.FULL_SCHEMA, - HiveIcebergTestUtils.FULL_SCHEMA_OBJECT_INSPECTOR); + MapWritable map = new MapWritable(); + map.put(new LongWritable(1L), new Text("Taylor")); + Object[] data = new Object[] { map }; + Record actual = deserializer.deserialize(data); + + Assert.assertEquals(expected, actual); + } + + @Test + public void testListDeserialize() { + Schema schema = new Schema( + optional(1, "list_type", Types.ListType.ofOptional(2, Types.LongType.get())) + ); + + ObjectInspector inspector = ObjectInspectorFactory.getStandardStructObjectInspector( + Arrays.asList("list_type"), + Arrays.asList( + ObjectInspectorFactory.getStandardListObjectInspector( + PrimitiveObjectInspectorFactory.writableLongObjectInspector) + )); + + Deserializer deserializer = new Deserializer.Builder() + .schema(schema) + .inspector(inspector) + .build(); + + Record expected = GenericRecord.create(schema); + expected.set(0, Collections.singletonList(1L)); + + Object[] data = new Object[] { new Object[] { new LongWritable(1L) } }; + Record actual = deserializer.deserialize(data); + + Assert.assertEquals(expected, actual); + } + + @Test + public void testDeserializeEverySupportedType() { + Assume.assumeFalse("No test yet for Hive3 (Date/Timestamp creation)", MetastoreUtil.hive3PresentOnClasspath()); + + Deserializer deserializer = new Deserializer.Builder() + .schema(HiveIcebergTestUtils.FULL_SCHEMA) + .inspector(HiveIcebergTestUtils.FULL_SCHEMA_OBJECT_INSPECTOR) + .build(); Record expected = HiveIcebergTestUtils.getTestRecord(false); Record actual = deserializer.deserialize(HiveIcebergTestUtils.valuesForTestRecord(expected)); @@ -80,9 +147,11 @@ public void testDeserializeEverySupportedType() throws SerDeException { } @Test - public void testNullDeserialize() throws SerDeException { - Deserializer deserializer = new Deserializer(HiveIcebergTestUtils.FULL_SCHEMA, - HiveIcebergTestUtils.FULL_SCHEMA_OBJECT_INSPECTOR); + public void testNullDeserialize() { + Deserializer deserializer = new Deserializer.Builder() + .schema(HiveIcebergTestUtils.FULL_SCHEMA) + .inspector(HiveIcebergTestUtils.FULL_SCHEMA_OBJECT_INSPECTOR) + .build(); Record expected = HiveIcebergTestUtils.getNullTestRecord(); @@ -97,8 +166,8 @@ public void testNullDeserialize() throws SerDeException { Assert.assertNull(deserializer.deserialize(null)); } - @Test(expected = SerDeException.class) - public void testSerDeException() throws SerDeException { + @Test + public void testUnsupportedType() { Schema unsupported = new Schema( optional(1, "time_type", Types.TimeType.get()) ); @@ -108,6 +177,13 @@ public void testSerDeException() throws SerDeException { PrimitiveObjectInspectorFactory.writableStringObjectInspector )); - new Deserializer(unsupported, objectInspector); + AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, + "Unsupported column type", () -> { + new Deserializer.Builder() + .schema(unsupported) + .inspector(objectInspector) + .build(); + } + ); } } From 37fd4fd86badf1d234bc03859b7b480332f49375 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Sun, 6 Dec 2020 22:05:55 +0100 Subject: [PATCH 5/6] Renamed IcebergWriteObjectInspector to WriteObjectInspector --- .../main/java/org/apache/iceberg/mr/hive/Deserializer.java | 4 ++-- .../serde/objectinspector/IcebergBinaryObjectInspector.java | 6 +++--- .../serde/objectinspector/IcebergDateObjectInspector.java | 4 ++-- .../objectinspector/IcebergDecimalObjectInspector.java | 4 ++-- .../objectinspector/IcebergTimestampObjectInspector.java | 6 +++--- ...gWriteObjectInspector.java => WriteObjectInspector.java} | 4 ++-- 6 files changed, 14 insertions(+), 14 deletions(-) rename mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/{IcebergWriteObjectInspector.java => WriteObjectInspector.java} (90%) diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java index a2a4cf16c94c..d3490159972d 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java @@ -33,7 +33,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergWriteObjectInspector; +import org.apache.iceberg.mr.hive.serde.objectinspector.WriteObjectInspector; import org.apache.iceberg.schema.SchemaWithPartnerVisitor; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.Types.ListType; @@ -108,7 +108,7 @@ public FieldDeserializer primitive(PrimitiveType type, ObjectInspector inspector case BINARY: case DECIMAL: // Iceberg specific conversions - return o -> ((IcebergWriteObjectInspector) inspector).getIcebergObject(o); + return o -> ((WriteObjectInspector) inspector).convert(o); case TIME: default: throw new IllegalArgumentException("Unsupported column type: " + type); diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java index 60f89eb11bc9..de6a26f4a469 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java @@ -28,7 +28,7 @@ import org.apache.iceberg.util.ByteBuffers; public abstract class IcebergBinaryObjectInspector extends AbstractPrimitiveJavaObjectInspector - implements BinaryObjectInspector, IcebergWriteObjectInspector { + implements BinaryObjectInspector, WriteObjectInspector { private static final IcebergBinaryObjectInspector BYTE_ARRAY = new IcebergBinaryObjectInspector() { @Override @@ -37,7 +37,7 @@ byte[] toByteArray(Object o) { } @Override - public byte[] getIcebergObject(Object o) { + public byte[] convert(Object o) { return o == null ? null : ((BytesWritable) o).getBytes(); } }; @@ -49,7 +49,7 @@ byte[] toByteArray(Object o) { } @Override - public ByteBuffer getIcebergObject(Object o) { + public ByteBuffer convert(Object o) { return o == null ? null : ByteBuffer.wrap(((BytesWritable) o).getBytes()); } }; diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java index 3a3e51b5131d..8959780b078d 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java @@ -28,7 +28,7 @@ import org.apache.iceberg.util.DateTimeUtil; public final class IcebergDateObjectInspector extends AbstractPrimitiveJavaObjectInspector - implements DateObjectInspector, IcebergWriteObjectInspector { + implements DateObjectInspector, WriteObjectInspector { private static final IcebergDateObjectInspector INSTANCE = new IcebergDateObjectInspector(); @@ -66,7 +66,7 @@ public Object copyObject(Object o) { } @Override - public LocalDate getIcebergObject(Object o) { + public LocalDate convert(Object o) { return o == null ? null : ((DateWritable) o).get().toLocalDate(); } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java index 47b4e156fa68..fa023d289004 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java @@ -31,7 +31,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public final class IcebergDecimalObjectInspector extends AbstractPrimitiveJavaObjectInspector - implements HiveDecimalObjectInspector, IcebergWriteObjectInspector { + implements HiveDecimalObjectInspector, WriteObjectInspector { private static final Cache CACHE = Caffeine.newBuilder() .expireAfterAccess(10, TimeUnit.MINUTES) @@ -79,7 +79,7 @@ public Object copyObject(Object o) { } @Override - public BigDecimal getIcebergObject(Object o) { + public BigDecimal convert(Object o) { return o == null ? null : ((HiveDecimalWritable) o).getHiveDecimal().bigDecimalValue(); } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java index b3102d9f5629..1f40aa8d962a 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; public abstract class IcebergTimestampObjectInspector extends AbstractPrimitiveJavaObjectInspector - implements TimestampObjectInspector, IcebergWriteObjectInspector { + implements TimestampObjectInspector, WriteObjectInspector { private static final IcebergTimestampObjectInspector INSTANCE_WITH_ZONE = new IcebergTimestampObjectInspector() { @Override @@ -38,7 +38,7 @@ LocalDateTime toLocalDateTime(Object o) { } @Override - public OffsetDateTime getIcebergObject(Object o) { + public OffsetDateTime convert(Object o) { return o == null ? null : OffsetDateTime.ofInstant(((TimestampWritable) o).getTimestamp().toInstant(), ZoneId.of("UTC")); } @@ -51,7 +51,7 @@ LocalDateTime toLocalDateTime(Object o) { } @Override - public LocalDateTime getIcebergObject(Object o) { + public LocalDateTime convert(Object o) { return o == null ? null : ((TimestampWritable) o).getTimestamp().toLocalDateTime(); } }; diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergWriteObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/WriteObjectInspector.java similarity index 90% rename from mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergWriteObjectInspector.java rename to mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/WriteObjectInspector.java index b92dd8a07ae8..a7ac2acb62bd 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergWriteObjectInspector.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/WriteObjectInspector.java @@ -19,6 +19,6 @@ package org.apache.iceberg.mr.hive.serde.objectinspector; -public interface IcebergWriteObjectInspector { - Object getIcebergObject(Object value); +public interface WriteObjectInspector { + Object convert(Object value); } From b8d5a64ea1d1a3fca6b635118dea32ab2ee9e9da Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Tue, 8 Dec 2020 20:02:51 +0100 Subject: [PATCH 6/6] Handling Hive Schema problem and some javadoc --- .../apache/iceberg/mr/hive/Deserializer.java | 64 ++++++++++++++++++- .../iceberg/mr/hive/TestDeserializer.java | 24 +++++++ 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java index d3490159972d..b59db22fd3fd 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.iceberg.Schema; @@ -45,6 +46,10 @@ class Deserializer { private FieldDeserializer fieldDeserializer; + /** + * Builder to create a Deserializer instance. + * Requires an Iceberg Schema and the Hive ObjectInspector for converting the data. + */ static class Builder { private Schema schema; private ObjectInspector inspector; @@ -64,6 +69,11 @@ Deserializer build() { } } + /** + * Deserializes the Hive result object to an Iceberg record using the provided ObjectInspectors. + * @param data The Hive data to deserialize + * @return The resulting Iceberg Record + */ Record deserialize(Object data) { return (Record) fieldDeserializer.value(data); } @@ -75,7 +85,8 @@ private Deserializer(Schema schema, ObjectInspector fieldInspector) { private static class DeserializerVisitor extends SchemaWithPartnerVisitor { public static FieldDeserializer visit(Schema schema, ObjectInspector objectInspector) { - return visit(schema, objectInspector, new DeserializerVisitor(), new PartnerObjectInspectorByNameAccessors()); + return visit(schema, new FixNameMappingObjectInspector(schema, objectInspector), new DeserializerVisitor(), + new PartnerObjectInspectorByNameAccessors()); } @Override @@ -206,4 +217,55 @@ public ObjectInspector listElementPartner(ObjectInspector inspector) { private interface FieldDeserializer { Object value(Object object); } + + /** + * Hive query results schema column names do not match the target Iceberg column names. + * Instead we have to rely on the column order. To keep the other parts of the code generic we fix this with a + * wrapper around the ObjectInspector. This wrapper uses the Iceberg schema column names instead of the Hive column + * names for {@link #getStructFieldRef(String) getStructFieldRef} + */ + private static class FixNameMappingObjectInspector extends StructObjectInspector { + private final StructObjectInspector innerInspector; + private final Map nameMap; + + private FixNameMappingObjectInspector(Schema schema, ObjectInspector inspector) { + this.nameMap = new HashMap<>(schema.columns().size()); + this.innerInspector = (StructObjectInspector) inspector; + List fields = innerInspector.getAllStructFieldRefs(); + + for (int i = 0; i < schema.columns().size(); ++i) { + nameMap.put(schema.columns().get(i).name(), fields.get(i)); + } + } + + @Override + public List getAllStructFieldRefs() { + return innerInspector.getAllStructFieldRefs(); + } + + @Override + public StructField getStructFieldRef(String fieldName) { + return nameMap.get(fieldName); + } + + @Override + public Object getStructFieldData(Object data, StructField fieldRef) { + return innerInspector.getStructFieldData(data, fieldRef); + } + + @Override + public List getStructFieldsDataAsList(Object data) { + return innerInspector.getStructFieldsDataAsList(data); + } + + @Override + public String getTypeName() { + return innerInspector.getTypeName(); + } + + @Override + public Category getCategory() { + return innerInspector.getCategory(); + } + } } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java index aa1b68048c51..76f7cf52698b 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java @@ -54,6 +54,30 @@ public class TestDeserializer { PrimitiveObjectInspectorFactory.writableStringObjectInspector )); + @Test + public void testSchemaDeserialize() { + StandardStructObjectInspector schemaObjectInspector = + ObjectInspectorFactory.getStandardStructObjectInspector( + Arrays.asList("0:col1", "1:col2"), + Arrays.asList( + PrimitiveObjectInspectorFactory.writableLongObjectInspector, + PrimitiveObjectInspectorFactory.writableStringObjectInspector + )); + + Deserializer deserializer = new Deserializer.Builder() + .schema(CUSTOMER_SCHEMA) + .inspector(schemaObjectInspector) + .build(); + + Record expected = GenericRecord.create(CUSTOMER_SCHEMA); + expected.set(0, 1L); + expected.set(1, "Bob"); + + Record actual = deserializer.deserialize(new Object[] { new LongWritable(1L), new Text("Bob") }); + + Assert.assertEquals(expected, actual); + } + @Test public void testStructDeserialize() { Deserializer deserializer = new Deserializer.Builder()