From 9104949e6bd7c62a3203550b2d003c4849fc2875 Mon Sep 17 00:00:00 2001 From: Edgar Rodriguez Date: Tue, 23 Jun 2020 10:24:37 -0700 Subject: [PATCH 01/12] Skip columns without iceberg ids on schema conversion --- .../org/apache/iceberg/orc/ORCSchemaUtil.java | 133 ++++++++++-------- 1 file changed, 76 insertions(+), 57 deletions(-) diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index 970f891956cd..1c0d2ad16330 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -214,10 +213,14 @@ public static Schema convert(TypeDescription orcSchema) { "Error in ORC file, children fields and names do not match."); List icebergFields = Lists.newArrayListWithExpectedSize(children.size()); - AtomicInteger lastColumnId = new AtomicInteger(getMaxIcebergId(orcSchema)); for (int i = 0; i < children.size(); i++) { - icebergFields.add(convertOrcToIceberg(children.get(i), childrenNames.get(i), - lastColumnId::incrementAndGet)); + final TypeDescription child = children.get(i); + final String childName = childrenNames.get(i); + icebergID(children.get(i)).flatMap(id -> convertOrcToIceberg(child, childName, id)).ifPresent(icebergFields::add); + } + + if (icebergFields.size() == 0) { + throw new IllegalArgumentException("ORC schema has no Iceberg mappings"); } return new Schema(icebergFields); @@ -403,114 +406,130 @@ private static Types.NestedField getIcebergType(int icebergID, String name, Type Types.NestedField.optional(icebergID, name, type); } - private static Types.NestedField convertOrcToIceberg(TypeDescription orcType, String name, - TypeUtil.NextID nextID) { - - final int icebergID = icebergID(orcType).orElseGet(nextID::get); + private static Optional convertOrcToIceberg(TypeDescription orcType, String name, int icebergID) { final boolean isRequired = isRequired(orcType); + Types.NestedField foundField = null; switch (orcType.getCategory()) { case BOOLEAN: - return getIcebergType(icebergID, name, Types.BooleanType.get(), isRequired); + foundField = getIcebergType(icebergID, name, Types.BooleanType.get(), isRequired); + break; case BYTE: case SHORT: case INT: - return getIcebergType(icebergID, name, Types.IntegerType.get(), isRequired); + foundField = getIcebergType(icebergID, name, Types.IntegerType.get(), isRequired); + break; case LONG: String longAttributeValue = orcType.getAttributeValue(ICEBERG_LONG_TYPE_ATTRIBUTE); LongType longType = longAttributeValue == null ? LongType.LONG : LongType.valueOf(longAttributeValue); switch (longType) { case TIME: - return getIcebergType(icebergID, name, Types.TimeType.get(), isRequired); + foundField = getIcebergType(icebergID, name, Types.TimeType.get(), isRequired); + break; case LONG: - return getIcebergType(icebergID, name, Types.LongType.get(), isRequired); + foundField = getIcebergType(icebergID, name, Types.LongType.get(), isRequired); + break; default: throw new IllegalStateException("Invalid Long type found in ORC type attribute"); } + break; case FLOAT: - return getIcebergType(icebergID, name, Types.FloatType.get(), isRequired); + foundField = getIcebergType(icebergID, name, Types.FloatType.get(), isRequired); + break; case DOUBLE: - return getIcebergType(icebergID, name, Types.DoubleType.get(), isRequired); + foundField = getIcebergType(icebergID, name, Types.DoubleType.get(), isRequired); + break; case STRING: case CHAR: case VARCHAR: - return getIcebergType(icebergID, name, Types.StringType.get(), isRequired); + foundField = getIcebergType(icebergID, name, Types.StringType.get(), isRequired); + break; case BINARY: String binaryAttributeValue = orcType.getAttributeValue(ICEBERG_BINARY_TYPE_ATTRIBUTE); BinaryType binaryType = binaryAttributeValue == null ? BinaryType.BINARY : BinaryType.valueOf(binaryAttributeValue); switch (binaryType) { case UUID: - return getIcebergType(icebergID, name, Types.UUIDType.get(), isRequired); + foundField = getIcebergType(icebergID, name, Types.UUIDType.get(), isRequired); + break; case FIXED: int fixedLength = Integer.parseInt(orcType.getAttributeValue(ICEBERG_FIELD_LENGTH)); - return getIcebergType(icebergID, name, Types.FixedType.ofLength(fixedLength), isRequired); + foundField = getIcebergType(icebergID, name, Types.FixedType.ofLength(fixedLength), isRequired); + break; case BINARY: - return getIcebergType(icebergID, name, Types.BinaryType.get(), isRequired); + foundField = getIcebergType(icebergID, name, Types.BinaryType.get(), isRequired); + break; default: throw new IllegalStateException("Invalid Binary type found in ORC type attribute"); } + break; case DATE: - return getIcebergType(icebergID, name, Types.DateType.get(), isRequired); + foundField = getIcebergType(icebergID, name, Types.DateType.get(), isRequired); + break; case TIMESTAMP: - return getIcebergType(icebergID, name, Types.TimestampType.withoutZone(), isRequired); + foundField = getIcebergType(icebergID, name, Types.TimestampType.withoutZone(), isRequired); + break; case TIMESTAMP_INSTANT: - return getIcebergType(icebergID, name, Types.TimestampType.withZone(), isRequired); + foundField = getIcebergType(icebergID, name, Types.TimestampType.withZone(), isRequired); + break; case DECIMAL: - return getIcebergType(icebergID, name, - Types.DecimalType.of(orcType.getPrecision(), orcType.getScale()), + foundField = getIcebergType(icebergID, name, Types.DecimalType.of(orcType.getPrecision(), orcType.getScale()), isRequired); - case STRUCT: { + break; + case STRUCT: List fieldNames = orcType.getFieldNames(); List fieldTypes = orcType.getChildren(); List fields = new ArrayList<>(fieldNames.size()); for (int c = 0; c < fieldNames.size(); ++c) { String childName = fieldNames.get(c); TypeDescription type = fieldTypes.get(c); - Types.NestedField field = convertOrcToIceberg(type, childName, nextID); - fields.add(field); + Optional childId = icebergID(type); + childId.flatMap(integer -> convertOrcToIceberg(type, childName, integer)).ifPresent(fields::add); } - return getIcebergType(icebergID, name, Types.StructType.of(fields), isRequired); - } - case LIST: { + if (fields.size() > 0) { + foundField = getIcebergType(icebergID, name, Types.StructType.of(fields), isRequired); + } + break; + case LIST: TypeDescription elementType = orcType.getChildren().get(0); - Types.NestedField element = convertOrcToIceberg(elementType, "element", nextID); - - Types.ListType listTypeWithElem = isRequired(elementType) ? - Types.ListType.ofRequired(element.fieldId(), element.type()) : - Types.ListType.ofOptional(element.fieldId(), element.type()); - return isRequired ? - Types.NestedField.required(icebergID, name, listTypeWithElem) : - Types.NestedField.optional(icebergID, name, listTypeWithElem); - } + Optional elementId = icebergID(elementType); + if (elementId.isPresent()) { + Optional element = convertOrcToIceberg(elementType, "element", elementId.get()); + if (element.isPresent()) { + Types.NestedField foundElement = element.get(); + Types.ListType listTypeWithElem = isRequired(elementType) ? + Types.ListType.ofRequired(foundElement.fieldId(), foundElement.type()) : + Types.ListType.ofOptional(foundElement.fieldId(), foundElement.type()); + foundField = getIcebergType(icebergID, name, listTypeWithElem, isRequired); + } + } + break; case MAP: { TypeDescription keyType = orcType.getChildren().get(0); - Types.NestedField key = convertOrcToIceberg(keyType, "key", nextID); + Optional keyId = icebergID(keyType); TypeDescription valueType = orcType.getChildren().get(1); - Types.NestedField value = convertOrcToIceberg(valueType, "value", nextID); - - Types.MapType mapTypeWithKV = isRequired(valueType) ? - Types.MapType.ofRequired(key.fieldId(), value.fieldId(), key.type(), value.type()) : - Types.MapType.ofOptional(key.fieldId(), value.fieldId(), key.type(), value.type()); - - return getIcebergType(icebergID, name, mapTypeWithKV, isRequired); + Optional valueId = icebergID(valueType); + if (keyId.isPresent() && valueId.isPresent()) { + Optional key = convertOrcToIceberg(keyType, "key", keyId.get()); + Optional value = convertOrcToIceberg(valueType, "value", valueId.get()); + + if (key.isPresent() && value.isPresent()) { + Types.NestedField foundKey = key.get(); + Types.NestedField foundValue = value.get(); + Types.MapType mapTypeWithKV = isRequired(valueType) ? + Types.MapType.ofRequired(foundKey.fieldId(), foundValue.fieldId(), foundKey.type(), foundValue.type()) : + Types.MapType.ofOptional(foundKey.fieldId(), foundValue.fieldId(), foundKey.type(), foundValue.type()); + foundField = getIcebergType(icebergID, name, mapTypeWithKV, isRequired); + } + } + break; } default: // We don't have an answer for union types. throw new IllegalArgumentException("Can't handle " + orcType); } - } - - private static int getMaxIcebergId(TypeDescription originalOrcSchema) { - int maxId = icebergID(originalOrcSchema).orElse(0); - final List children = Optional.ofNullable(originalOrcSchema.getChildren()) - .orElse(Collections.emptyList()); - for (TypeDescription child : children) { - maxId = Math.max(maxId, getMaxIcebergId(child)); - } - - return maxId; + return Optional.ofNullable(foundField); } /** From 2970838dbc70a2eec8ce454c435aacfa0ac4c82c Mon Sep 17 00:00:00 2001 From: Edgar Rodriguez Date: Thu, 25 Jun 2020 11:30:08 -0700 Subject: [PATCH 02/12] Fix NPE on null list in ORC type visitor --- .../java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java index 175cddf5c189..53b0c9f2fdeb 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java @@ -44,7 +44,7 @@ public static T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeV Types.ListType list = iType != null ? iType.asListType() : null; return visitor.list( list, schema, - visit(list.elementType(), schema.getChildren().get(0), visitor)); + visit(list != null ? list.elementType() : null, schema.getChildren().get(0), visitor)); case MAP: Types.MapType map = iType != null ? iType.asMapType() : null; From e74640c29b1dae917059e2cb201715c4344abce5 Mon Sep 17 00:00:00 2001 From: Edgar Rodriguez Date: Thu, 25 Jun 2020 15:07:19 -0700 Subject: [PATCH 03/12] Fix #1055 Use visitor to convert ORC schema to Iceberg This change also skips columns that do not have an Iceberg ID attribute. --- .../org/apache/iceberg/orc/ORCSchemaUtil.java | 266 ++++++++++-------- 1 file changed, 142 insertions(+), 124 deletions(-) diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index 1c0d2ad16330..79ca046a8952 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -19,12 +19,11 @@ package org.apache.iceberg.orc; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -213,10 +212,10 @@ public static Schema convert(TypeDescription orcSchema) { "Error in ORC file, children fields and names do not match."); List icebergFields = Lists.newArrayListWithExpectedSize(children.size()); - for (int i = 0; i < children.size(); i++) { - final TypeDescription child = children.get(i); - final String childName = childrenNames.get(i); - icebergID(children.get(i)).flatMap(id -> convertOrcToIceberg(child, childName, id)).ifPresent(icebergFields::add); + OrcToIcebergVisitor schemaConverter = new OrcToIcebergVisitor(icebergToOrcMapping("root", orcSchema)); + + for (TypeDescription child : orcSchema.getChildren()) { + OrcToIcebergVisitor.visit((Type) null, child, schemaConverter).ifPresent(icebergFields::add); } if (icebergFields.size() == 0) { @@ -406,130 +405,149 @@ private static Types.NestedField getIcebergType(int icebergID, String name, Type Types.NestedField.optional(icebergID, name, type); } - private static Optional convertOrcToIceberg(TypeDescription orcType, String name, int icebergID) { - final boolean isRequired = isRequired(orcType); - Types.NestedField foundField = null; + private static class OrcToIcebergVisitor extends OrcSchemaWithTypeVisitor> { - switch (orcType.getCategory()) { - case BOOLEAN: - foundField = getIcebergType(icebergID, name, Types.BooleanType.get(), isRequired); - break; - case BYTE: - case SHORT: - case INT: - foundField = getIcebergType(icebergID, name, Types.IntegerType.get(), isRequired); - break; - case LONG: - String longAttributeValue = orcType.getAttributeValue(ICEBERG_LONG_TYPE_ATTRIBUTE); - LongType longType = longAttributeValue == null ? LongType.LONG : LongType.valueOf(longAttributeValue); - switch (longType) { - case TIME: - foundField = getIcebergType(icebergID, name, Types.TimeType.get(), isRequired); - break; - case LONG: - foundField = getIcebergType(icebergID, name, Types.LongType.get(), isRequired); - break; - default: - throw new IllegalStateException("Invalid Long type found in ORC type attribute"); - } - break; - case FLOAT: - foundField = getIcebergType(icebergID, name, Types.FloatType.get(), isRequired); - break; - case DOUBLE: - foundField = getIcebergType(icebergID, name, Types.DoubleType.get(), isRequired); - break; - case STRING: - case CHAR: - case VARCHAR: - foundField = getIcebergType(icebergID, name, Types.StringType.get(), isRequired); - break; - case BINARY: - String binaryAttributeValue = orcType.getAttributeValue(ICEBERG_BINARY_TYPE_ATTRIBUTE); - BinaryType binaryType = binaryAttributeValue == null ? BinaryType.BINARY : - BinaryType.valueOf(binaryAttributeValue); - switch (binaryType) { - case UUID: - foundField = getIcebergType(icebergID, name, Types.UUIDType.get(), isRequired); - break; - case FIXED: - int fixedLength = Integer.parseInt(orcType.getAttributeValue(ICEBERG_FIELD_LENGTH)); - foundField = getIcebergType(icebergID, name, Types.FixedType.ofLength(fixedLength), isRequired); - break; - case BINARY: - foundField = getIcebergType(icebergID, name, Types.BinaryType.get(), isRequired); - break; - default: - throw new IllegalStateException("Invalid Binary type found in ORC type attribute"); - } - break; - case DATE: - foundField = getIcebergType(icebergID, name, Types.DateType.get(), isRequired); - break; - case TIMESTAMP: - foundField = getIcebergType(icebergID, name, Types.TimestampType.withoutZone(), isRequired); - break; - case TIMESTAMP_INSTANT: - foundField = getIcebergType(icebergID, name, Types.TimestampType.withZone(), isRequired); - break; - case DECIMAL: - foundField = getIcebergType(icebergID, name, Types.DecimalType.of(orcType.getPrecision(), orcType.getScale()), - isRequired); - break; - case STRUCT: - List fieldNames = orcType.getFieldNames(); - List fieldTypes = orcType.getChildren(); - List fields = new ArrayList<>(fieldNames.size()); - for (int c = 0; c < fieldNames.size(); ++c) { - String childName = fieldNames.get(c); - TypeDescription type = fieldTypes.get(c); - Optional childId = icebergID(type); - childId.flatMap(integer -> convertOrcToIceberg(type, childName, integer)).ifPresent(fields::add); - } + private final Map icebergToOrcMapping; - if (fields.size() > 0) { - foundField = getIcebergType(icebergID, name, Types.StructType.of(fields), isRequired); - } - break; - case LIST: - TypeDescription elementType = orcType.getChildren().get(0); - Optional elementId = icebergID(elementType); - if (elementId.isPresent()) { - Optional element = convertOrcToIceberg(elementType, "element", elementId.get()); - if (element.isPresent()) { - Types.NestedField foundElement = element.get(); - Types.ListType listTypeWithElem = isRequired(elementType) ? - Types.ListType.ofRequired(foundElement.fieldId(), foundElement.type()) : - Types.ListType.ofOptional(foundElement.fieldId(), foundElement.type()); - foundField = getIcebergType(icebergID, name, listTypeWithElem, isRequired); + OrcToIcebergVisitor(Map icebergToOrcMapping) { + this.icebergToOrcMapping = icebergToOrcMapping; + } + + @Override + public Optional record(Types.StructType iStruct, TypeDescription record, List names, + List> fields) { + boolean isRequired = isRequired(record); + Optional icebergIdOpt = icebergID(record); + if (!icebergIdOpt.isPresent() || fields.size() == 0) { + return Optional.empty(); + } + + return Optional.of(getIcebergType(icebergIdOpt.get(), icebergToOrcMapping.get(icebergIdOpt.get()).name(), + Types.StructType.of(fields.stream().filter(Optional::isPresent).map(Optional::get) + .collect(Collectors.toList())), isRequired)); + } + + @Override + public Optional list(Types.ListType iList, TypeDescription array, + Optional element) { + boolean isRequired = isRequired(array); + Optional icebergIdOpt = icebergID(array); + + if (!icebergIdOpt.isPresent() || !element.isPresent()) { + return Optional.empty(); + } + + Types.NestedField foundElement = element.get(); + Types.ListType listTypeWithElem = isRequired(array.getChildren().get(0)) ? + Types.ListType.ofRequired(foundElement.fieldId(), foundElement.type()) : + Types.ListType.ofOptional(foundElement.fieldId(), foundElement.type()); + return Optional.of(getIcebergType(icebergIdOpt.get(), + icebergToOrcMapping.get(icebergIdOpt.get()).name(), listTypeWithElem, isRequired)); + } + + @Override + public Optional map(Types.MapType iMap, TypeDescription map, Optional key, + Optional value) { + boolean isRequired = isRequired(map); + Optional icebergIdOpt = icebergID(map); + + if (!icebergIdOpt.isPresent() || !key.isPresent() || !value.isPresent()) { + return Optional.empty(); + } + + Types.NestedField foundKey = key.get(); + Types.NestedField foundValue = value.get(); + Types.MapType mapTypeWithKV = isRequired(map.getChildren().get(1)) ? + Types.MapType.ofRequired(foundKey.fieldId(), foundValue.fieldId(), foundKey.type(), foundValue.type()) : + Types.MapType.ofOptional(foundKey.fieldId(), foundValue.fieldId(), foundKey.type(), foundValue.type()); + + return Optional.of(getIcebergType(icebergIdOpt.get(), + icebergToOrcMapping.get(icebergIdOpt.get()).name(), mapTypeWithKV, isRequired)); + } + + @Override + public Optional primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + boolean isRequired = isRequired(primitive); + Optional icebergIdOpt = icebergID(primitive); + + if (!icebergIdOpt.isPresent()) { + return Optional.empty(); + } + + final Types.NestedField foundField; + int icebergID = icebergIdOpt.get(); + String name = icebergToOrcMapping.get(icebergID).name(); + switch (primitive.getCategory()) { + case BOOLEAN: + foundField = getIcebergType(icebergID, name, Types.BooleanType.get(), isRequired); + break; + case BYTE: + case SHORT: + case INT: + foundField = getIcebergType(icebergID, name, Types.IntegerType.get(), isRequired); + break; + case LONG: + String longAttributeValue = primitive.getAttributeValue(ICEBERG_LONG_TYPE_ATTRIBUTE); + LongType longType = longAttributeValue == null ? LongType.LONG : LongType.valueOf(longAttributeValue); + switch (longType) { + case TIME: + foundField = getIcebergType(icebergID, name, Types.TimeType.get(), isRequired); + break; + case LONG: + foundField = getIcebergType(icebergID, name, Types.LongType.get(), isRequired); + break; + default: + throw new IllegalStateException("Invalid Long type found in ORC type attribute"); } - } - break; - case MAP: { - TypeDescription keyType = orcType.getChildren().get(0); - Optional keyId = icebergID(keyType); - TypeDescription valueType = orcType.getChildren().get(1); - Optional valueId = icebergID(valueType); - if (keyId.isPresent() && valueId.isPresent()) { - Optional key = convertOrcToIceberg(keyType, "key", keyId.get()); - Optional value = convertOrcToIceberg(valueType, "value", valueId.get()); - - if (key.isPresent() && value.isPresent()) { - Types.NestedField foundKey = key.get(); - Types.NestedField foundValue = value.get(); - Types.MapType mapTypeWithKV = isRequired(valueType) ? - Types.MapType.ofRequired(foundKey.fieldId(), foundValue.fieldId(), foundKey.type(), foundValue.type()) : - Types.MapType.ofOptional(foundKey.fieldId(), foundValue.fieldId(), foundKey.type(), foundValue.type()); - foundField = getIcebergType(icebergID, name, mapTypeWithKV, isRequired); + break; + case FLOAT: + foundField = getIcebergType(icebergID, name, Types.FloatType.get(), isRequired); + break; + case DOUBLE: + foundField = getIcebergType(icebergID, name, Types.DoubleType.get(), isRequired); + break; + case STRING: + case CHAR: + case VARCHAR: + foundField = getIcebergType(icebergID, name, Types.StringType.get(), isRequired); + break; + case BINARY: + String binaryAttributeValue = primitive.getAttributeValue(ICEBERG_BINARY_TYPE_ATTRIBUTE); + BinaryType binaryType = binaryAttributeValue == null ? BinaryType.BINARY : + BinaryType.valueOf(binaryAttributeValue); + switch (binaryType) { + case UUID: + foundField = getIcebergType(icebergID, name, Types.UUIDType.get(), isRequired); + break; + case FIXED: + int fixedLength = Integer.parseInt(primitive.getAttributeValue(ICEBERG_FIELD_LENGTH)); + foundField = getIcebergType(icebergID, name, Types.FixedType.ofLength(fixedLength), isRequired); + break; + case BINARY: + foundField = getIcebergType(icebergID, name, Types.BinaryType.get(), isRequired); + break; + default: + throw new IllegalStateException("Invalid Binary type found in ORC type attribute"); } - } - break; + break; + case DATE: + foundField = getIcebergType(icebergID, name, Types.DateType.get(), isRequired); + break; + case TIMESTAMP: + foundField = getIcebergType(icebergID, name, Types.TimestampType.withoutZone(), isRequired); + break; + case TIMESTAMP_INSTANT: + foundField = getIcebergType(icebergID, name, Types.TimestampType.withZone(), isRequired); + break; + case DECIMAL: + foundField = getIcebergType(icebergID, name, + Types.DecimalType.of(primitive.getPrecision(), primitive.getScale()), isRequired); + break; + default: + throw new IllegalArgumentException("Can't handle " + primitive); } - default: - // We don't have an answer for union types. - throw new IllegalArgumentException("Can't handle " + orcType); + return Optional.of(foundField); } - return Optional.ofNullable(foundField); } /** From d66cb470ef45a25c2726bb1255fb3cf4bb981d2a Mon Sep 17 00:00:00 2001 From: Edgar Rodriguez Date: Fri, 26 Jun 2020 10:24:17 -0700 Subject: [PATCH 04/12] Add test for skipping non-iceberg columns in ORC --- .../org/apache/iceberg/orc/ORCSchemaUtil.java | 20 ++++------- .../iceberg/orc/OrcSchemaWithTypeVisitor.java | 21 ++++++++--- .../apache/iceberg/orc/TestORCSchemaUtil.java | 36 +++++++++++++++++++ 3 files changed, 60 insertions(+), 17 deletions(-) diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index 79ca046a8952..ee1210b994c5 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -65,8 +65,8 @@ public TypeDescription type() { } } - private static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id"; - private static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required"; + static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id"; + static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required"; /** * The name of the ORC {@link TypeDescription} attribute indicating the Iceberg type corresponding to an @@ -213,7 +213,6 @@ public static Schema convert(TypeDescription orcSchema) { List icebergFields = Lists.newArrayListWithExpectedSize(children.size()); OrcToIcebergVisitor schemaConverter = new OrcToIcebergVisitor(icebergToOrcMapping("root", orcSchema)); - for (TypeDescription child : orcSchema.getChildren()) { OrcToIcebergVisitor.visit((Type) null, child, schemaConverter).ifPresent(icebergFields::add); } @@ -384,12 +383,6 @@ static Optional icebergID(TypeDescription orcType) { .map(Integer::parseInt); } - static int fieldId(TypeDescription orcType) { - String idStr = orcType.getAttributeValue(ICEBERG_ID_ATTRIBUTE); - Preconditions.checkNotNull(idStr, "Missing expected '%s' property", ICEBERG_ID_ATTRIBUTE); - return Integer.parseInt(idStr); - } - private static boolean isRequired(TypeDescription orcType) { String isRequiredStr = orcType.getAttributeValue(ICEBERG_REQUIRED_ATTRIBUTE); if (isRequiredStr != null) { @@ -422,9 +415,10 @@ public Optional record(Types.StructType iStruct, TypeDescript return Optional.empty(); } + Types.StructType structType = Types.StructType.of( + fields.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList())); return Optional.of(getIcebergType(icebergIdOpt.get(), icebergToOrcMapping.get(icebergIdOpt.get()).name(), - Types.StructType.of(fields.stream().filter(Optional::isPresent).map(Optional::get) - .collect(Collectors.toList())), isRequired)); + structType, isRequired)); } @Override @@ -461,8 +455,8 @@ public Optional map(Types.MapType iMap, TypeDescription map, Types.MapType.ofRequired(foundKey.fieldId(), foundValue.fieldId(), foundKey.type(), foundValue.type()) : Types.MapType.ofOptional(foundKey.fieldId(), foundValue.fieldId(), foundKey.type(), foundValue.type()); - return Optional.of(getIcebergType(icebergIdOpt.get(), - icebergToOrcMapping.get(icebergIdOpt.get()).name(), mapTypeWithKV, isRequired)); + return Optional.of(getIcebergType(icebergIdOpt.get(), icebergToOrcMapping.get(icebergIdOpt.get()).name(), + mapTypeWithKV, isRequired)); } @Override diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java index 53b0c9f2fdeb..98976f55cad7 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java @@ -20,13 +20,18 @@ package org.apache.iceberg.orc; import java.util.List; +import java.util.Optional; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class OrcSchemaWithTypeVisitor { + private static final Logger LOG = LoggerFactory.getLogger(OrcSchemaWithTypeVisitor.class); + public static T visit( org.apache.iceberg.Schema iSchema, TypeDescription schema, OrcSchemaWithTypeVisitor visitor) { return visit(iSchema.asStruct(), schema, visitor); @@ -63,12 +68,20 @@ private static T visitRecord( List fields = record.getChildren(); List names = record.getFieldNames(); List results = Lists.newArrayListWithExpectedSize(fields.size()); - for (TypeDescription field : fields) { - int fieldId = ORCSchemaUtil.fieldId(field); - Types.NestedField iField = struct != null ? struct.field(fieldId) : null; + List includedNames = Lists.newArrayListWithExpectedSize(names.size()); + for (int i = 0; i < fields.size(); ++i) { + TypeDescription field = fields.get(i); + String name = names.get(i); + Optional fieldId = ORCSchemaUtil.icebergID(field); + if (!fieldId.isPresent()) { + LOG.warn("Missing expected '{}' property - skipping field {}", ORCSchemaUtil.ICEBERG_ID_ATTRIBUTE, name); + continue; + } + Types.NestedField iField = struct != null ? struct.field(fieldId.get()) : null; results.add(visit(iField != null ? iField.type() : null, field, visitor)); + includedNames.add(name); } - return visitor.record(struct, record, names, results); + return visitor.record(struct, record, includedNames, results); } public T record(Types.StructType iStruct, TypeDescription record, List names, List fields) { diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java b/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java index 461e1d4bd930..11cc944051d4 100644 --- a/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java +++ b/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java @@ -25,6 +25,7 @@ import org.junit.Test; import static org.apache.iceberg.AssertHelpers.assertThrows; +import static org.apache.iceberg.orc.ORCSchemaUtil.ICEBERG_ID_ATTRIBUTE; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.junit.Assert.assertEquals; @@ -209,4 +210,39 @@ public void testInvalidTypePromotions() { ORCSchemaUtil.buildOrcProjection(evolveSchema, orcSchema); }); } + + @Test + public void testSkipNonIcebergColumns() { + TypeDescription schema = TypeDescription.createStruct(); + TypeDescription intCol = TypeDescription.createInt(); + intCol.setAttribute(ICEBERG_ID_ATTRIBUTE, "1"); + TypeDescription listCol = TypeDescription + .createList(TypeDescription.createMap(TypeDescription.createString(), TypeDescription.createDate())); + listCol.setAttribute(ICEBERG_ID_ATTRIBUTE, "2"); + schema.addField("intCol", intCol); + schema.addField("listCol", listCol); + TypeDescription stringKey = TypeDescription.createString(); + stringKey.setAttribute(ICEBERG_ID_ATTRIBUTE, "3"); + TypeDescription booleanVal = TypeDescription.createBoolean(); + booleanVal.setAttribute(ICEBERG_ID_ATTRIBUTE, "4"); + TypeDescription mapCol = TypeDescription.createMap(stringKey, booleanVal); + mapCol.setAttribute(ICEBERG_ID_ATTRIBUTE, "5"); + schema.addField("mapCol", mapCol); + + Schema icebergSchema = ORCSchemaUtil.convert(schema); + assertEquals(2, icebergSchema.asStruct().fields().size()); + + TypeDescription structCol = TypeDescription.createStruct(); + structCol.setAttribute(ICEBERG_ID_ATTRIBUTE, "7"); + TypeDescription binaryCol = TypeDescription.createBinary(); + TypeDescription doubleCol = TypeDescription.createDouble(); + doubleCol.setAttribute(ICEBERG_ID_ATTRIBUTE, "6"); + structCol.addField("binaryCol", binaryCol); + structCol.addField("doubleCol", doubleCol); + schema.addField("structCol", structCol); + + Schema icebergSchema2 = ORCSchemaUtil.convert(schema); + assertEquals(3, icebergSchema2.asStruct().fields().size()); + assertEquals(1, icebergSchema2.findField("structCol").type().asStructType().fields().size()); + } } From 170551c045aee18d167f4cef0c71563ab5a21235 Mon Sep 17 00:00:00 2001 From: Edgar Rodriguez Date: Mon, 29 Jun 2020 09:47:01 -0700 Subject: [PATCH 05/12] Update javadoc --- orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index ee1210b994c5..0c5691d3f0da 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -200,10 +200,11 @@ private static TypeDescription convert(Integer fieldId, Type type, boolean isReq /** * Convert an ORC schema to an Iceberg schema. This method handles the convertion from the original - * Iceberg column mapping IDs if present in the ORC column attributes, otherwise, ORC column IDs - * will be assigned following ORCs pre-order ID assignment. + * Iceberg column mapping IDs if present in the ORC column attributes, otherwise, ORC columns with no + * Iceberg IDs will be ignored and skipped in the conversion. * * @return the Iceberg schema + * @throws IllegalArgumentException if ORC schema has no columns with Iceberg ID attributes */ public static Schema convert(TypeDescription orcSchema) { List children = orcSchema.getChildren(); From 693a7faf2f754dd7c268901e4ff2d392df365c80 Mon Sep 17 00:00:00 2001 From: Edgar Rodriguez Date: Mon, 29 Jun 2020 10:47:38 -0700 Subject: [PATCH 06/12] Add OrcSchemaVisitor --- .../org/apache/iceberg/orc/ORCSchemaUtil.java | 18 ++-- .../apache/iceberg/orc/OrcSchemaVisitor.java | 83 +++++++++++++++++++ .../iceberg/orc/OrcSchemaWithTypeVisitor.java | 21 +---- 3 files changed, 99 insertions(+), 23 deletions(-) create mode 100644 orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index 0c5691d3f0da..9a1521cc57cc 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -215,7 +215,7 @@ public static Schema convert(TypeDescription orcSchema) { List icebergFields = Lists.newArrayListWithExpectedSize(children.size()); OrcToIcebergVisitor schemaConverter = new OrcToIcebergVisitor(icebergToOrcMapping("root", orcSchema)); for (TypeDescription child : orcSchema.getChildren()) { - OrcToIcebergVisitor.visit((Type) null, child, schemaConverter).ifPresent(icebergFields::add); + OrcToIcebergVisitor.visit(child, schemaConverter).ifPresent(icebergFields::add); } if (icebergFields.size() == 0) { @@ -384,6 +384,12 @@ static Optional icebergID(TypeDescription orcType) { .map(Integer::parseInt); } + static int fieldId(TypeDescription orcType) { + String idStr = orcType.getAttributeValue(ICEBERG_ID_ATTRIBUTE); + Preconditions.checkNotNull(idStr, "Missing expected '%s' property", ICEBERG_ID_ATTRIBUTE); + return Integer.parseInt(idStr); + } + private static boolean isRequired(TypeDescription orcType) { String isRequiredStr = orcType.getAttributeValue(ICEBERG_REQUIRED_ATTRIBUTE); if (isRequiredStr != null) { @@ -399,7 +405,7 @@ private static Types.NestedField getIcebergType(int icebergID, String name, Type Types.NestedField.optional(icebergID, name, type); } - private static class OrcToIcebergVisitor extends OrcSchemaWithTypeVisitor> { + private static class OrcToIcebergVisitor extends OrcSchemaVisitor> { private final Map icebergToOrcMapping; @@ -408,7 +414,7 @@ private static class OrcToIcebergVisitor extends OrcSchemaWithTypeVisitor