From 38dc3f82c1ec693c5a9e7eec0fe59ea3ccfb9648 Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Tue, 4 Jun 2019 21:27:04 -0700 Subject: [PATCH 01/15] Add external schema mappings for files written with name-based schemas --- .../java/org/apache/iceberg/avro/Avro.java | 9 +- .../apache/iceberg/avro/AvroSchemaUtil.java | 18 +- .../iceberg/avro/AvroSchemaVisitor.java | 25 ++- .../iceberg/avro/BuildAvroProjection.java | 8 +- .../iceberg/avro/ProjectionDatumReader.java | 8 +- .../org/apache/iceberg/avro/PruneColumns.java | 143 +++++++++++--- .../org/apache/iceberg/avro/RemoveIds.java | 69 +++++++ .../iceberg/avro/TestAvroNameMapping.java | 182 ++++++++++++++++++ .../iceberg/data/avro/IcebergDecoder.java | 2 +- 9 files changed, 425 insertions(+), 39 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/avro/RemoveIds.java create mode 100644 core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 1276e23f7818..760b80588cd5 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -38,6 +38,7 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.mapping.NameMapping; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_DEFAULT; @@ -170,6 +171,7 @@ public static class ReadBuilder { private final ClassLoader defaultLoader = Thread.currentThread().getContextClassLoader(); private final InputFile file; private final Map renames = Maps.newLinkedHashMap(); + private NameMapping nameMapping; private boolean reuseContainers = false; private org.apache.iceberg.Schema schema = null; private Function> createReaderFunc = readSchema -> { @@ -223,10 +225,15 @@ public ReadBuilder rename(String fullName, String newName) { return this; } + public ReadBuilder nameMapping(NameMapping newNameMapping) { + this.nameMapping = newNameMapping; + return this; + } + public AvroIterable build() { Preconditions.checkNotNull(schema, "Schema is required"); return new AvroIterable<>(file, - new ProjectionDatumReader<>(createReaderFunc, schema, renames), + new ProjectionDatumReader<>(createReaderFunc, schema, renames, nameMapping), start, length, reuseContainers); } } diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index 3ae089b9d0b6..613236e17bcb 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -29,6 +29,7 @@ import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; +import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; @@ -83,8 +84,8 @@ public static Map convertTypes(Types.StructType type, String name) return ImmutableMap.copyOf(converter.getConversionMap()); } - public static Schema pruneColumns(Schema schema, Set selectedIds) { - return new PruneColumns(selectedIds).rootSchema(schema); + public static Schema pruneColumns(Schema schema, Set selectedIds, NameMapping nameMapping) { + return new PruneColumns(selectedIds, nameMapping).rootSchema(schema); } public static Schema buildAvroProjection(Schema schema, org.apache.iceberg.Schema expected, @@ -196,7 +197,7 @@ static Schema createProjectionMap(String recordName, return LogicalMap.get().addToSchema(Schema.createArray(keyValueRecord)); } - private static int getId(Schema schema, String propertyName) { + static int getId(Schema schema, String propertyName) { if (schema.getType() == UNION) { return getId(fromOption(schema), propertyName); } @@ -207,6 +208,13 @@ private static int getId(Schema schema, String propertyName) { return toInt(id); } + static boolean hasProperty(Schema schema, String propertyName) { + if (schema.getType() == UNION) { + return hasProperty(fromOption(schema), propertyName); + } + return schema.getObjectProp(propertyName) != null; + } + public static int getKeyId(Schema schema) { Preconditions.checkArgument(schema.getType() == MAP, "Cannot get map key id for non-map schema: %s", schema); @@ -232,6 +240,10 @@ public static int getFieldId(Schema.Field field) { return toInt(id); } + public static boolean hasFieldId(Schema.Field field) { + return field.getObjectProp(FIELD_ID_PROP) != null; + } + private static int toInt(Object value) { if (value instanceof Number) { return ((Number) value).intValue(); diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java index 4bdb96e58f9f..15d670d45fd6 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java @@ -41,7 +41,8 @@ public static T visit(Schema schema, AvroSchemaVisitor visitor) { List results = Lists.newArrayListWithExpectedSize(fields.size()); for (Schema.Field field : schema.getFields()) { names.add(field.name()); - results.add(visit(field.schema(), visitor)); + T result = visitWithName(field.name(), field.schema(), visitor); + results.add(result); } visitor.recordLevels.pop(); @@ -57,10 +58,14 @@ public static T visit(Schema schema, AvroSchemaVisitor visitor) { return visitor.union(schema, options); case ARRAY: - return visitor.array(schema, visit(schema.getElementType(), visitor)); + if (schema.getLogicalType() instanceof LogicalMap || AvroSchemaUtil.isKeyValueSchema(schema.getElementType())) { + return visitor.array(schema, visit(schema.getElementType(), visitor)); + } else { + return visitor.array(schema, visitWithName("element", schema.getElementType(), visitor)); + } case MAP: - return visitor.map(schema, visit(schema.getValueType(), visitor)); + return visitor.map(schema, visitWithName("value", schema.getValueType(), visitor)); default: return visitor.primitive(schema); @@ -68,6 +73,20 @@ public static T visit(Schema schema, AvroSchemaVisitor visitor) { } private Deque recordLevels = Lists.newLinkedList(); + private Deque fieldNames = Lists.newLinkedList(); + + protected Deque fieldNames() { + return fieldNames; + } + + private static T visitWithName(String name, Schema schema, AvroSchemaVisitor visitor) { + try { + visitor.fieldNames.addLast(name); + return visit(schema, visitor); + } finally { + visitor.fieldNames.removeLast(); + } + } public T record(Schema record, List names, List fields) { return null; diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index c4ae1678c9ef..c53f4cc79d29 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -165,13 +165,14 @@ public Schema array(Schema array, Supplier element) { try { Schema keyValueSchema = array.getElementType(); Schema.Field keyField = keyValueSchema.getFields().get(0); + Schema.Field keyProjection = element.get().getField("key"); Schema.Field valueField = keyValueSchema.getFields().get(1); Schema.Field valueProjection = element.get().getField("value"); // element was changed, create a new array - if (valueProjection.schema() != valueField.schema()) { + if (keyProjection.schema() != keyField.schema() || valueProjection.schema() != valueField.schema()) { return AvroSchemaUtil.createProjectionMap(keyValueSchema.getFullName(), - AvroSchemaUtil.getFieldId(keyField), keyField.name(), keyField.schema(), + AvroSchemaUtil.getFieldId(keyField), keyField.name(), keyProjection.schema(), AvroSchemaUtil.getFieldId(valueField), valueField.name(), valueProjection.schema()); } else if (!(array.getLogicalType() instanceof LogicalMap)) { return AvroSchemaUtil.createProjectionMap(keyValueSchema.getFullName(), @@ -195,6 +196,8 @@ public Schema array(Schema array, Supplier element) { // element was changed, create a new array if (elementSchema != array.getElementType()) { + // TODO: we do not copy field ids here. Probably not required, + // but we do at other places in this class. return Schema.createArray(elementSchema); } @@ -219,6 +222,7 @@ public Schema map(Schema map, Supplier value) { // element was changed, create a new map if (valueSchema != map.getValueType()) { + // TODO: we do not copy field ids here return Schema.createMap(valueSchema); } diff --git a/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java b/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java index 35a70f16a3c8..4a4b4b1e53ec 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java @@ -26,29 +26,33 @@ import org.apache.avro.Schema; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; +import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.types.TypeUtil; public class ProjectionDatumReader implements DatumReader { private final Function> getReader; private final org.apache.iceberg.Schema expectedSchema; private final Map renames; + private final NameMapping nameMapping; private Schema readSchema = null; private Schema fileSchema = null; private DatumReader wrapped = null; public ProjectionDatumReader(Function> getReader, org.apache.iceberg.Schema expectedSchema, - Map renames) { + Map renames, + NameMapping nameMapping) { this.getReader = getReader; this.expectedSchema = expectedSchema; this.renames = renames; + this.nameMapping = nameMapping; } @Override public void setSchema(Schema newFileSchema) { this.fileSchema = newFileSchema; Set projectedIds = TypeUtil.getProjectedIds(expectedSchema); - Schema prunedSchema = AvroSchemaUtil.pruneColumns(newFileSchema, projectedIds); + Schema prunedSchema = AvroSchemaUtil.pruneColumns(newFileSchema, projectedIds, nameMapping); this.readSchema = AvroSchemaUtil.buildAvroProjection(prunedSchema, expectedSchema, renames); this.wrapped = newDatumReader(); } diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index 201c1950588b..61a1ce14afa6 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -26,15 +26,19 @@ import java.util.Map; import java.util.Set; import org.apache.avro.Schema; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.NameMapping; class PruneColumns extends AvroSchemaVisitor { private final Set selectedIds; + private final NameMapping nameMapping; - PruneColumns(Set selectedIds) { + PruneColumns(Set selectedIds, NameMapping nameMapping) { this.selectedIds = selectedIds; + this.nameMapping = nameMapping; } - public Schema rootSchema(Schema record) { + Schema rootSchema(Schema record) { Schema result = visit(record, this); if (result != null) { return result; @@ -47,9 +51,13 @@ public Schema rootSchema(Schema record) { public Schema record(Schema record, List names, List fields) { // Then this should access the record's fields by name List filteredFields = Lists.newArrayListWithExpectedSize(fields.size()); - boolean hasChange = false; for (Schema.Field field : record.getFields()) { - int fieldId = AvroSchemaUtil.getFieldId(field); + Integer fieldId = fieldId(field); + if (fieldId == null) { + // both the schema and the nameMapping does not have field id. We prune this field. + continue; + } + Schema fieldSchema = fields.get(field.pos()); // All primitives are selected by selecting the field, but map and list // types can be selected by projecting the keys, values, or elements. @@ -58,21 +66,19 @@ public Schema record(Schema record, List names, List fields) { // case where the converted field is non-null is when a map or list is // selected by lower IDs. if (selectedIds.contains(fieldId)) { - filteredFields.add(copyField(field, field.schema())); + filteredFields.add(copyField(field, field.schema(), fieldId)); } else if (fieldSchema != null) { - hasChange = true; - filteredFields.add(copyField(field, fieldSchema)); + filteredFields.add(copyField(field, fieldSchema, fieldId)); } } - if (hasChange) { + if (filteredFields.size() > 0) { return copyRecord(record, filteredFields); - } else if (filteredFields.size() == record.getFields().size()) { + } else if (record.getFields().isEmpty()) { return record; - } else if (!filteredFields.isEmpty()) { - return copyRecord(record, filteredFields); } + // No fields selected, also record has fields return null; } @@ -100,36 +106,46 @@ public Schema union(Schema union, List options) { } @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") public Schema array(Schema array, Schema element) { - if (array.getLogicalType() instanceof LogicalMap) { + if (array.getLogicalType() instanceof LogicalMap || AvroSchemaUtil.isKeyValueSchema(array.getElementType())) { Schema keyValue = array.getElementType(); - int keyId = AvroSchemaUtil.getFieldId(keyValue.getField("key")); - int valueId = AvroSchemaUtil.getFieldId(keyValue.getField("value")); + Integer keyId = fieldId(keyValue.getField("key")); + Integer valueId = fieldId(keyValue.getField("value")); + if (keyId == null) { + Preconditions.checkState(valueId == null, "Map schema %s has value id but not key id", array); + return null; + } // if either key or value is selected, the whole map must be projected if (selectedIds.contains(keyId) || selectedIds.contains(valueId)) { - return array; + return complexMapWithIds(array, keyId, valueId); } else if (element != null) { - if (keyValue.getField("value").schema() != element.getField("value").schema()) { + if (keyValue.getField("key").schema() != element.getField("key").schema() || + keyValue.getField("value").schema() != element.getField("value").schema()) { // the value must be a projection return AvroSchemaUtil.createMap( - keyId, keyValue.getField("key").schema(), + keyId, element.getField("key").schema(), valueId, element.getField("value").schema()); } else { - return array; + return complexMapWithIds(array, keyId, valueId); } } } else { - int elementId = AvroSchemaUtil.getElementId(array); + Integer elementId = id(array, AvroSchemaUtil.ELEMENT_ID_PROP, "element"); + if (elementId == null) { + return null; + } + if (selectedIds.contains(elementId)) { - return array; + return arrayWithId(array, elementId); } else if (element != null) { if (element != array.getElementType()) { // the element must be a projection - return Schema.createArray(element); + return arrayWithId(Schema.createArray(element), elementId); } - return array; + return arrayWithId(array, elementId); } } @@ -138,15 +154,20 @@ public Schema array(Schema array, Schema element) { @Override public Schema map(Schema map, Schema value) { - int keyId = AvroSchemaUtil.getKeyId(map); - int valueId = AvroSchemaUtil.getValueId(map); + Integer keyId = id(map, AvroSchemaUtil.KEY_ID_PROP, "key"); + Integer valueId = id(map, AvroSchemaUtil.VALUE_ID_PROP, "value"); + if (keyId == null) { + Preconditions.checkState(valueId == null, "Map schema %s has value-id but not key-id", map); + return null; + } // if either key or value is selected, the whole map must be projected if (selectedIds.contains(keyId) || selectedIds.contains(valueId)) { - return map; + // Assign ids. Ids may not always be present in the schema + return mapWithIds(map, keyId, valueId); } else if (value != null) { if (value != map.getValueType()) { // the value must be a projection - return Schema.createMap(value); + return mapWithIds(Schema.createMap(value), keyId, valueId); } return map; } @@ -154,12 +175,75 @@ public Schema map(Schema map, Schema value) { return null; } + private Schema arrayWithId(Schema array, Integer elementId) { + if (!AvroSchemaUtil.hasProperty(array, AvroSchemaUtil.ELEMENT_ID_PROP)) { + Schema result = Schema.createArray(array.getElementType()); + result.addProp(AvroSchemaUtil.ELEMENT_ID_PROP, elementId); + return result; + } + return array; + } + + private Schema complexMapWithIds(Schema map, Integer keyId, Integer valueId) { + Schema keyValue = map.getElementType(); + if (!AvroSchemaUtil.hasFieldId(keyValue.getField("key"))) { + return AvroSchemaUtil.createMap( + keyId, keyValue.getField("key").schema(), + valueId, keyValue.getField("value").schema()); + } + return map; + } + + private Schema mapWithIds(Schema map, Integer keyId, Integer valueId) { + if (!AvroSchemaUtil.hasProperty(map, AvroSchemaUtil.KEY_ID_PROP)) { + Schema result = Schema.createMap(map.getValueType()); + result.addProp(AvroSchemaUtil.KEY_ID_PROP, keyId); + result.addProp(AvroSchemaUtil.VALUE_ID_PROP, valueId); + return result; + } + return map; + } + @Override public Schema primitive(Schema primitive) { // primitives are not selected directly return null; } + private Integer id(Schema schema, String propertyName, String mappedName) { + if (AvroSchemaUtil.hasProperty(schema, propertyName)) { + return AvroSchemaUtil.getId(schema, propertyName); + } else { + MappedField mappedField = mappedField(mappedName); + if (mappedField != null) { + return mappedField.id(); + } else { + return null; + } + } + } + + private Integer fieldId(Schema.Field field) { + if (AvroSchemaUtil.hasFieldId(field)) { + return AvroSchemaUtil.getFieldId(field); + } else { + MappedField mappedField = mappedField(field.name()); + if (mappedField != null) { + return mappedField.id(); + } else { + return null; + } + } + } + + private MappedField mappedField(String fieldName) { + Preconditions.checkState(nameMapping != null, + "Cannot find mapped field for field name %s. NameMapping is null", fieldName); + List fieldNames = Lists.newArrayList(fieldNames()); + fieldNames.add(fieldName); + return nameMapping.find(fieldNames); + } + private static Schema copyRecord(Schema record, List newFields) { Schema copy = Schema.createRecord(record.getName(), record.getDoc(), record.getNamespace(), record.isError(), newFields); @@ -171,7 +255,7 @@ private static Schema copyRecord(Schema record, List newFields) { return copy; } - private static Schema.Field copyField(Schema.Field field, Schema newSchema) { + private static Schema.Field copyField(Schema.Field field, Schema newSchema, Integer fieldId) { Schema.Field copy = new Schema.Field(field.name(), newSchema, field.doc(), field.defaultVal(), field.order()); @@ -179,6 +263,11 @@ private static Schema.Field copyField(Schema.Field field, Schema newSchema) { copy.addProp(prop.getKey(), prop.getValue()); } + if (!AvroSchemaUtil.hasFieldId(field)) { + // field may not have a fieldId if the fieldId was fetched from nameMapping + copy.addProp(AvroSchemaUtil.FIELD_ID_PROP, fieldId); + } + return copy; } } diff --git a/core/src/test/java/org/apache/iceberg/avro/RemoveIds.java b/core/src/test/java/org/apache/iceberg/avro/RemoveIds.java new file mode 100644 index 000000000000..83937c47aa98 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/avro/RemoveIds.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import com.google.common.collect.Lists; +import java.util.List; +import java.util.Map; +import org.apache.avro.Schema; + +class RemoveIds extends AvroSchemaVisitor { + @Override + public Schema record(Schema record, List names, List types) { + List fields = record.getFields(); + int length = fields.size(); + List newFields = Lists.newArrayListWithExpectedSize(length); + for (int i = 0; i < length; i += 1) { + newFields.add(copyField(fields.get(i), types.get(i))); + } + return AvroSchemaUtil.copyRecord(record, newFields, null); + } + + @Override + public Schema map(Schema map, Schema valueType) { + return Schema.createMap(valueType); + } + + @Override + public Schema array(Schema array, Schema element) { + return Schema.createArray(element); + } + + @Override + public Schema primitive(Schema primitive) { + return Schema.create(primitive.getType()); + } + + @Override + public Schema union(Schema union, List options) { + return Schema.createUnion(options); + } + + private static Schema.Field copyField(Schema.Field field, Schema newSchema) { + Schema.Field copy = new Schema.Field(field.name(), newSchema, field.doc(), field.defaultVal(), field.order()); + for (Map.Entry prop : field.getObjectProps().entrySet()) { + String key = prop.getKey(); + if (key != AvroSchemaUtil.FIELD_ID_PROP) { + copy.addProp(key, prop.getValue()); + } + } + return copy; + } +} diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java new file mode 100644 index 000000000000..3c254a985e76 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import java.io.IOException; +import java.util.Collections; +import java.util.Set; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Schema; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +public class TestAvroNameMapping extends TestAvroReadProjection { + @Override + protected GenericData.Record writeAndRead(String desc, + Schema writeSchema, + Schema readSchema, + GenericData.Record inputRecord) throws IOException { + + // Use all existing TestAvroReadProjection tests to verify that + // we get the same projected (Avro) read schema whether we use + // NameMapping together with file schema without field-ids or we + // use a file schema having field-ids + GenericData.Record record = super.writeAndRead(desc, writeSchema, readSchema, inputRecord); + org.apache.avro.Schema expected = record.getSchema(); + org.apache.avro.Schema actual = projectWithNameMapping(writeSchema, readSchema, MappingUtil.create(writeSchema)); + Assert.assertEquals(expected, actual); + return record; + } + + @Test + public void testNameMappingProjections() { + Schema fileSchema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(5, "locations", Types.MapType.ofOptional(6, 7, + Types.StringType.get(), + Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()), + Types.NestedField.required(2, "long", Types.FloatType.get()) + ) + ))); + + // Table mapping does not project `locations` map + NameMapping nameMapping = MappingUtil.create(new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()))); + + // Table read schema projects `locations` map's value + Schema readSchema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(5, "locations", Types.MapType.ofOptional(6, 7, + Types.StringType.get(), + Types.StructType.of( + Types.NestedField.required(2, "long", Types.FloatType.get()) + ) + ))); + + check(fileSchema, readSchema, nameMapping); + } + + @Test + public void testMissingFields() { + Schema fileSchema = new Schema( + Types.NestedField.required(19, "x", Types.IntegerType.get()), + Types.NestedField.optional(18, "y", Types.IntegerType.get())); + + // table mapping not projecting a required field 'x' + NameMapping nameMapping = MappingUtil.create(new Schema( + Types.NestedField.optional(18, "y", Types.IntegerType.get()))); + + Schema readSchema = fileSchema; + AssertHelpers.assertThrows("Missing required field in nameMapping", + IllegalArgumentException.class, "Missing required field: x", + // In this case, pruneColumns result is an empty record + () -> projectWithNameMapping(fileSchema, readSchema, nameMapping)); + } + + @Test + public void testComplexMapKeys() { + Schema fileSchema = new Schema( + Types.NestedField.required(5, "locations", Types.MapType.ofOptional(6, 7, + Types.StructType.of( + Types.NestedField.required(3, "k1", Types.StringType.get()), + Types.NestedField.optional(4, "k2", Types.StringType.get()) + ), + Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()), + Types.NestedField.optional(2, "long", Types.FloatType.get()) + ) + ))); + + // project a subset of the map's key and value columns in NameMapping + NameMapping nameMapping = MappingUtil.create(new Schema( + Types.NestedField.required(5, "locations", Types.MapType.ofOptional(6, 7, + Types.StructType.of( + Types.NestedField.required(3, "k1", Types.StringType.get()) + ), + Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()) + ) + )))); + + Schema readSchema = new Schema( + Types.NestedField.required(5, "locations", Types.MapType.ofOptional(6, 7, + Types.StructType.of( + Types.NestedField.required(3, "k1", Types.StringType.get()), + Types.NestedField.optional(4, "k2", Types.StringType.get()) + ), + Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()), + Types.NestedField.optional(2, "long", Types.FloatType.get()) + ) + ))); + + check(fileSchema, readSchema, nameMapping); + } + + //TODO: add tests on + // 1. aliases in namemapping + // 2. arrays + + private static org.apache.avro.Schema project(Schema writeSchema, Schema readSchema) { + // Build a read schema when file schema has field ids + org.apache.avro.Schema avroFileSchema = AvroSchemaUtil.convert(writeSchema.asStruct(), "table"); + Set projectedIds = TypeUtil.getProjectedIds(readSchema); + org.apache.avro.Schema prunedFileSchema = AvroSchemaUtil.pruneColumns(avroFileSchema, projectedIds, null); + return AvroSchemaUtil.buildAvroProjection(prunedFileSchema, readSchema, Collections.emptyMap()); + } + + private static org.apache.avro.Schema projectWithNameMapping( + Schema writeSchema, Schema readSchema, NameMapping nameMapping) { + // Build a read schema when file schema does not have field ids . The field ids are provided by `nameMapping` + org.apache.avro.Schema avroFileSchema = removeIds(writeSchema); + Set projectedIds = TypeUtil.getProjectedIds(readSchema); + org.apache.avro.Schema prunedFileSchema = AvroSchemaUtil.pruneColumns(avroFileSchema, projectedIds, nameMapping); + return AvroSchemaUtil.buildAvroProjection(prunedFileSchema, readSchema, Collections.emptyMap()); + } + + private void check(Schema writeSchema, Schema readSchema, NameMapping nameMapping) { + org.apache.avro.Schema expected = project(writeSchema, readSchema); + org.apache.avro.Schema actual = projectWithNameMapping(writeSchema, readSchema, nameMapping); + + // projected/read schema built using external mapping should match projected/read schema + // built with file schema having field ids + + // `BuildAvroProjection` can skip adding fields ids in some cases. + // e.g when creating a map if the value is modified. This leads to test failures + // For now I've removed ids to perform equality testing. + Assert.assertEquals(removeIds(expected), removeIds(actual)); + // Projected/read schema built using external mapping will always match expected Iceberg schema + Assert.assertEquals(removeIds(actual), removeIds(AvroSchemaUtil.convert(readSchema, "table"))); + } + + private static org.apache.avro.Schema removeIds(Schema schema) { + return AvroSchemaVisitor.visit(AvroSchemaUtil.convert(schema.asStruct(), "table"), new RemoveIds()); + } + + private static org.apache.avro.Schema removeIds(org.apache.avro.Schema schema) { + return AvroSchemaVisitor.visit(schema, new RemoveIds()); + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java b/data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java index c6b1aaf8c219..97a50e9c29e7 100644 --- a/data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java +++ b/data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java @@ -166,7 +166,7 @@ private static class RawDecoder extends MessageDecoder.BaseDecoder { * @param writeSchema the schema used to decode buffers */ private RawDecoder(org.apache.iceberg.Schema readSchema, org.apache.avro.Schema writeSchema) { - this.reader = new ProjectionDatumReader<>(DataReader::create, readSchema, ImmutableMap.of()); + this.reader = new ProjectionDatumReader<>(DataReader::create, readSchema, ImmutableMap.of(), null); this.reader.setSchema(writeSchema); } From b11349be28bde2a5a07e9a36ed91eb557a76a166 Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Fri, 27 Sep 2019 18:40:29 -0700 Subject: [PATCH 02/15] Remove todos from BuildAvroProjection --- .../main/java/org/apache/iceberg/avro/BuildAvroProjection.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index c53f4cc79d29..d4e7add70928 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -196,8 +196,6 @@ public Schema array(Schema array, Supplier element) { // element was changed, create a new array if (elementSchema != array.getElementType()) { - // TODO: we do not copy field ids here. Probably not required, - // but we do at other places in this class. return Schema.createArray(elementSchema); } @@ -222,7 +220,6 @@ public Schema map(Schema map, Supplier value) { // element was changed, create a new map if (valueSchema != map.getValueType()) { - // TODO: we do not copy field ids here return Schema.createMap(valueSchema); } From a458efa96c04bf27270761b6c3f4a6f6a15dbbde Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Tue, 1 Oct 2019 15:26:58 -0700 Subject: [PATCH 03/15] Address review comments --- .../org/apache/iceberg/avro/BuildAvroProjection.java | 5 ++--- .../java/org/apache/iceberg/avro/PruneColumns.java | 12 +++++++++--- .../org/apache/iceberg/avro/TestAvroNameMapping.java | 5 +++-- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index d4e7add70928..c4ae1678c9ef 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -165,14 +165,13 @@ public Schema array(Schema array, Supplier element) { try { Schema keyValueSchema = array.getElementType(); Schema.Field keyField = keyValueSchema.getFields().get(0); - Schema.Field keyProjection = element.get().getField("key"); Schema.Field valueField = keyValueSchema.getFields().get(1); Schema.Field valueProjection = element.get().getField("value"); // element was changed, create a new array - if (keyProjection.schema() != keyField.schema() || valueProjection.schema() != valueField.schema()) { + if (valueProjection.schema() != valueField.schema()) { return AvroSchemaUtil.createProjectionMap(keyValueSchema.getFullName(), - AvroSchemaUtil.getFieldId(keyField), keyField.name(), keyProjection.schema(), + AvroSchemaUtil.getFieldId(keyField), keyField.name(), keyField.schema(), AvroSchemaUtil.getFieldId(valueField), valueField.name(), valueProjection.schema()); } else if (!(array.getLogicalType() instanceof LogicalMap)) { return AvroSchemaUtil.createProjectionMap(keyValueSchema.getFullName(), diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index 61a1ce14afa6..eb2f33fb7051 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -51,6 +51,7 @@ Schema rootSchema(Schema record) { public Schema record(Schema record, List names, List fields) { // Then this should access the record's fields by name List filteredFields = Lists.newArrayListWithExpectedSize(fields.size()); + boolean hasChange = false; for (Schema.Field field : record.getFields()) { Integer fieldId = fieldId(field); if (fieldId == null) { @@ -58,6 +59,10 @@ public Schema record(Schema record, List names, List fields) { continue; } + if (!AvroSchemaUtil.hasFieldId(field)) { + hasChange = true; + } + Schema fieldSchema = fields.get(field.pos()); // All primitives are selected by selecting the field, but map and list // types can be selected by projecting the keys, values, or elements. @@ -72,13 +77,14 @@ public Schema record(Schema record, List names, List fields) { } } - if (filteredFields.size() > 0) { + if (hasChange) { return copyRecord(record, filteredFields); - } else if (record.getFields().isEmpty()) { + } else if (filteredFields.size() == record.getFields().size()) { return record; + } else if (!filteredFields.isEmpty()) { + return copyRecord(record, filteredFields); } - // No fields selected, also record has fields return null; } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java index 3c254a985e76..08ba9830d2db 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java @@ -110,11 +110,12 @@ public void testComplexMapKeys() { ) ))); - // project a subset of the map's key and value columns in NameMapping + // project a subset of the map's value columns in NameMapping NameMapping nameMapping = MappingUtil.create(new Schema( Types.NestedField.required(5, "locations", Types.MapType.ofOptional(6, 7, Types.StructType.of( - Types.NestedField.required(3, "k1", Types.StringType.get()) + Types.NestedField.required(3, "k1", Types.StringType.get()), + Types.NestedField.optional(4, "k2", Types.StringType.get()) ), Types.StructType.of( Types.NestedField.required(1, "lat", Types.FloatType.get()) From 0b5028f7b770bfea5b5d7eea12f08eccb5954822 Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Tue, 1 Oct 2019 15:29:46 -0700 Subject: [PATCH 04/15] Address review comments --- .../src/main/java/org/apache/iceberg/avro/PruneColumns.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index eb2f33fb7051..4d47e695897e 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -28,8 +28,12 @@ import org.apache.avro.Schema; import org.apache.iceberg.mapping.MappedField; import org.apache.iceberg.mapping.NameMapping; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class PruneColumns extends AvroSchemaVisitor { + private static final Logger LOG = LoggerFactory.getLogger(PruneColumns.class); + private final Set selectedIds; private final NameMapping nameMapping; @@ -119,7 +123,7 @@ public Schema array(Schema array, Schema element) { Integer keyId = fieldId(keyValue.getField("key")); Integer valueId = fieldId(keyValue.getField("value")); if (keyId == null) { - Preconditions.checkState(valueId == null, "Map schema %s has value id but not key id", array); + LOG.warn("Map schema %s has value id but not key id", array); return null; } From a574e60bf4c6466c3b93ec66dde37a07a43c61eb Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Tue, 1 Oct 2019 16:00:12 -0700 Subject: [PATCH 05/15] Fix style errors --- core/src/main/java/org/apache/iceberg/avro/PruneColumns.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index 4d47e695897e..7508b683636b 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -123,7 +123,7 @@ public Schema array(Schema array, Schema element) { Integer keyId = fieldId(keyValue.getField("key")); Integer valueId = fieldId(keyValue.getField("value")); if (keyId == null) { - LOG.warn("Map schema %s has value id but not key id", array); + LOG.warn("Map schema {} has value id but not key id", array); return null; } From 41f282d7874ad09923645803b650c67e611b4630 Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Tue, 1 Oct 2019 16:27:37 -0700 Subject: [PATCH 06/15] Check error to warning if value is projected but not key --- core/src/main/java/org/apache/iceberg/avro/PruneColumns.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index 7508b683636b..dd575df545a5 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -167,7 +167,7 @@ public Schema map(Schema map, Schema value) { Integer keyId = id(map, AvroSchemaUtil.KEY_ID_PROP, "key"); Integer valueId = id(map, AvroSchemaUtil.VALUE_ID_PROP, "value"); if (keyId == null) { - Preconditions.checkState(valueId == null, "Map schema %s has value-id but not key-id", map); + LOG.warn("Map schema {} has value-id but not key-id", map); return null; } // if either key or value is selected, the whole map must be projected From 66d7a034b730ba8642a421fc30231330acbd8073 Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Tue, 1 Oct 2019 17:01:59 -0700 Subject: [PATCH 07/15] Fix typo when only projecting map keys --- .../main/java/org/apache/iceberg/avro/PruneColumns.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index dd575df545a5..b6a17ff33f53 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -123,7 +123,9 @@ public Schema array(Schema array, Schema element) { Integer keyId = fieldId(keyValue.getField("key")); Integer valueId = fieldId(keyValue.getField("value")); if (keyId == null) { - LOG.warn("Map schema {} has value id but not key id", array); + if (valueId != null) { + LOG.warn("Map schema {} has value id but not key id", array); + } return null; } @@ -167,7 +169,9 @@ public Schema map(Schema map, Schema value) { Integer keyId = id(map, AvroSchemaUtil.KEY_ID_PROP, "key"); Integer valueId = id(map, AvroSchemaUtil.VALUE_ID_PROP, "value"); if (keyId == null) { - LOG.warn("Map schema {} has value-id but not key-id", map); + if (valueId != null) { + LOG.warn("Map schema {} has value-id but not key-id", map); + } return null; } // if either key or value is selected, the whole map must be projected From 69749b1682f9c2f02c6c81238a60391998336b0b Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Fri, 4 Oct 2019 01:30:46 -0700 Subject: [PATCH 08/15] Add Tests address review comments --- .../org/apache/iceberg/avro/PruneColumns.java | 23 +++-- .../iceberg/avro/TestAvroNameMapping.java | 94 +++++++++++++++++-- 2 files changed, 100 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index b6a17ff33f53..10ad6dc4037b 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -122,19 +122,19 @@ public Schema array(Schema array, Schema element) { Schema keyValue = array.getElementType(); Integer keyId = fieldId(keyValue.getField("key")); Integer valueId = fieldId(keyValue.getField("value")); - if (keyId == null) { - if (valueId != null) { - LOG.warn("Map schema {} has value id but not key id", array); + if (keyId == null || valueId == null) { + if (keyId != null || valueId != null) { + LOG.warn("Map schema {} should have both key and value ids set or both unset", array); } return null; } - // if either key or value is selected, the whole map must be projected if (selectedIds.contains(keyId) || selectedIds.contains(valueId)) { return complexMapWithIds(array, keyId, valueId); } else if (element != null) { if (keyValue.getField("key").schema() != element.getField("key").schema() || keyValue.getField("value").schema() != element.getField("value").schema()) { + // key schemas can be different if new field ids were assigned to them // the value must be a projection return AvroSchemaUtil.createMap( keyId, element.getField("key").schema(), @@ -168,15 +168,16 @@ public Schema array(Schema array, Schema element) { public Schema map(Schema map, Schema value) { Integer keyId = id(map, AvroSchemaUtil.KEY_ID_PROP, "key"); Integer valueId = id(map, AvroSchemaUtil.VALUE_ID_PROP, "value"); - if (keyId == null) { - if (valueId != null) { - LOG.warn("Map schema {} has value-id but not key-id", map); + if (keyId == null || valueId == null) { + if (keyId != null || valueId != null) { + LOG.warn("Map schema {} should have both key and value ids set or both unset", map); } return null; } // if either key or value is selected, the whole map must be projected if (selectedIds.contains(keyId) || selectedIds.contains(valueId)) { - // Assign ids. Ids may not always be present in the schema + // Assign ids. Ids may not always be present in the schema, + // e.g if we are reading data not written by Iceberg writers return mapWithIds(map, keyId, valueId); } else if (value != null) { if (value != map.getValueType()) { @@ -200,7 +201,8 @@ private Schema arrayWithId(Schema array, Integer elementId) { private Schema complexMapWithIds(Schema map, Integer keyId, Integer valueId) { Schema keyValue = map.getElementType(); - if (!AvroSchemaUtil.hasFieldId(keyValue.getField("key"))) { + if (!AvroSchemaUtil.hasFieldId(keyValue.getField("key")) || + !AvroSchemaUtil.hasFieldId(keyValue.getField("value"))) { return AvroSchemaUtil.createMap( keyId, keyValue.getField("key").schema(), valueId, keyValue.getField("value").schema()); @@ -209,7 +211,8 @@ private Schema complexMapWithIds(Schema map, Integer keyId, Integer valueId) { } private Schema mapWithIds(Schema map, Integer keyId, Integer valueId) { - if (!AvroSchemaUtil.hasProperty(map, AvroSchemaUtil.KEY_ID_PROP)) { + if (!AvroSchemaUtil.hasProperty(map, AvroSchemaUtil.KEY_ID_PROP) || + !AvroSchemaUtil.hasProperty(map, AvroSchemaUtil.VALUE_ID_PROP)) { Schema result = Schema.createMap(map.getValueType()); result.addProp(AvroSchemaUtil.KEY_ID_PROP, keyId); result.addProp(AvroSchemaUtil.VALUE_ID_PROP, valueId); diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java index 08ba9830d2db..1a7eb7e65193 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java @@ -22,9 +22,13 @@ import java.io.IOException; import java.util.Collections; import java.util.Set; + +import com.google.common.collect.Lists; import org.apache.avro.generic.GenericData; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.Schema; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.MappedFields; import org.apache.iceberg.mapping.MappingUtil; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.types.TypeUtil; @@ -137,9 +141,85 @@ public void testComplexMapKeys() { check(fileSchema, readSchema, nameMapping); } - //TODO: add tests on - // 1. aliases in namemapping - // 2. arrays + @Test + public void testArrayProjections() { + Schema writeSchema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(22, "points", + Types.ListType.ofOptional(21, Types.StructType.of( + Types.NestedField.required(19, "x", Types.IntegerType.get()), + Types.NestedField.optional(18, "y", Types.IntegerType.get()) + )) + ) + ); + + NameMapping nameMapping = MappingUtil.create(new Schema( + // Optional array field missing. Will be filled in + // using default values using read schema + Types.NestedField.required(0, "id", Types.LongType.get()))); + + Schema readSchema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(22, "points", + Types.ListType.ofOptional(21, Types.StructType.of( + Types.NestedField.required(19, "x", Types.IntegerType.get()) + )) + ) + ); + + check(writeSchema, readSchema, nameMapping); + + // points array is partially projected + nameMapping = MappingUtil.create( new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(22, "points", + Types.ListType.ofOptional(21, Types.StructType.of( + Types.NestedField.required(19, "x", Types.IntegerType.get()))) + ) + )); + + check(writeSchema, readSchema, nameMapping); + } + + @Test + public void testAliases() { + Schema fileSchema = new Schema( + Types.NestedField.optional(22, "points", + Types.ListType.ofOptional(21, Types.StructType.of( + Types.NestedField.required(19, "x", Types.IntegerType.get()) + )) + ) + ); + + NameMapping nameMapping = NameMapping.of(MappedFields.of( + MappedField.of(22, "points", MappedFields.of( + MappedField.of(19, Lists.newArrayList("x", "y", "z")) + )))); + + + Schema readSchema = new Schema( + Types.NestedField.optional(22, "points", + Types.ListType.ofOptional(21, Types.StructType.of( + // x renamed to y + Types.NestedField.required(19, "y", Types.IntegerType.get()) + )) + ) + ); + + check(fileSchema, readSchema, nameMapping); + + readSchema = new Schema( + Types.NestedField.optional(22, "points", + Types.ListType.ofOptional(21, Types.StructType.of( + // x renamed to z + Types.NestedField.required(19, "z", Types.IntegerType.get()) + )) + ) + ); + + check(fileSchema, readSchema, nameMapping); + } + private static org.apache.avro.Schema project(Schema writeSchema, Schema readSchema) { // Build a read schema when file schema has field ids @@ -162,12 +242,12 @@ private void check(Schema writeSchema, Schema readSchema, NameMapping nameMappin org.apache.avro.Schema expected = project(writeSchema, readSchema); org.apache.avro.Schema actual = projectWithNameMapping(writeSchema, readSchema, nameMapping); - // projected/read schema built using external mapping should match projected/read schema - // built with file schema having field ids + // projected/read schema built using external mapping should match + // projected/read schema built with file schema having field ids // `BuildAvroProjection` can skip adding fields ids in some cases. - // e.g when creating a map if the value is modified. This leads to test failures - // For now I've removed ids to perform equality testing. + // e.g when creating a map if the value is modified. This leads to + // test failures. For now I've removed ids to perform equality testing. Assert.assertEquals(removeIds(expected), removeIds(actual)); // Projected/read schema built using external mapping will always match expected Iceberg schema Assert.assertEquals(removeIds(actual), removeIds(AvroSchemaUtil.convert(readSchema, "table"))); From c8cd96de46b44c649c505f4a61a91554e82d700d Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Fri, 4 Oct 2019 01:38:41 -0700 Subject: [PATCH 09/15] Fix checkstyle errors --- core/src/main/java/org/apache/iceberg/avro/PruneColumns.java | 2 ++ .../java/org/apache/iceberg/avro/TestAvroNameMapping.java | 5 ++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index 10ad6dc4037b..fe08a457e69c 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -128,6 +128,7 @@ public Schema array(Schema array, Schema element) { } return null; } + // if either key or value is selected, the whole map must be projected if (selectedIds.contains(keyId) || selectedIds.contains(valueId)) { return complexMapWithIds(array, keyId, valueId); @@ -174,6 +175,7 @@ public Schema map(Schema map, Schema value) { } return null; } + // if either key or value is selected, the whole map must be projected if (selectedIds.contains(keyId) || selectedIds.contains(valueId)) { // Assign ids. Ids may not always be present in the schema, diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java index 1a7eb7e65193..082938ba9e02 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java @@ -19,11 +19,10 @@ package org.apache.iceberg.avro; +import com.google.common.collect.Lists; import java.io.IOException; import java.util.Collections; import java.util.Set; - -import com.google.common.collect.Lists; import org.apache.avro.generic.GenericData; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.Schema; @@ -170,7 +169,7 @@ public void testArrayProjections() { check(writeSchema, readSchema, nameMapping); // points array is partially projected - nameMapping = MappingUtil.create( new Schema( + nameMapping = MappingUtil.create(new Schema( Types.NestedField.required(0, "id", Types.LongType.get()), Types.NestedField.optional(22, "points", Types.ListType.ofOptional(21, Types.StructType.of( From a227800311dd2a5590cf8b1f5e5d44052b79ca60 Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Fri, 4 Oct 2019 07:36:38 -0700 Subject: [PATCH 10/15] Move 'id' methods to AvroSchemaUtil --- .../apache/iceberg/avro/AvroSchemaUtil.java | 71 +++++++++++++++++-- .../org/apache/iceberg/avro/PruneColumns.java | 47 ++---------- 2 files changed, 70 insertions(+), 48 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index 613236e17bcb..b047a301b538 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import java.util.List; import java.util.Map; import java.util.Set; @@ -29,6 +30,7 @@ import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; +import org.apache.iceberg.mapping.MappedField; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; @@ -197,15 +199,31 @@ static Schema createProjectionMap(String recordName, return LogicalMap.get().addToSchema(Schema.createArray(keyValueRecord)); } - static int getId(Schema schema, String propertyName) { + + static Integer getId(Schema schema, String propertyName) { + Integer id = getId(schema, propertyName, null, null); + Preconditions.checkNotNull(id, "Missing expected '%s' property", propertyName); + return id; + } + + static Integer getId(Schema schema, String propertyName, + NameMapping nameMapping, List names) { if (schema.getType() == UNION) { - return getId(fromOption(schema), propertyName); + return getId(fromOption(schema), propertyName, nameMapping, names); } Object id = schema.getObjectProp(propertyName); - Preconditions.checkNotNull(id, "Missing expected '%s' property", propertyName); - - return toInt(id); + if (id != null) { + return toInt(id); + } else { + Preconditions.checkState(nameMapping != null, + "Schema does not have field id and name mapping not specified"); + MappedField mappedField = nameMapping.find(names); + if (mappedField != null) { + return mappedField.id(); + } + return null; + } } static boolean hasProperty(Schema schema, String propertyName) { @@ -221,23 +239,62 @@ public static int getKeyId(Schema schema) { return getId(schema, KEY_ID_PROP); } + static Integer getKeyId(Schema schema, NameMapping nameMapping, Iterable parentFieldNames) { + Preconditions.checkArgument(schema.getType() == MAP, + "Cannot get map key id for non-map schema: %s", schema); + List names = Lists.newArrayList(parentFieldNames); + names.add("key"); + return getId(schema, KEY_ID_PROP, nameMapping, names); + } + public static int getValueId(Schema schema) { Preconditions.checkArgument(schema.getType() == MAP, "Cannot get map value id for non-map schema: %s", schema); return getId(schema, VALUE_ID_PROP); } + static Integer getValueId(Schema schema, NameMapping nameMapping, Iterable parentFieldNames) { + Preconditions.checkArgument(schema.getType() == MAP, + "Cannot get map value id for non-map schema: %s", schema); + List names = Lists.newArrayList(parentFieldNames); + names.add("value"); + return getId(schema, VALUE_ID_PROP, nameMapping, names); + } + public static int getElementId(Schema schema) { Preconditions.checkArgument(schema.getType() == ARRAY, "Cannot get array element id for non-array schema: %s", schema); return getId(schema, ELEMENT_ID_PROP); } + static Integer getElementId(Schema schema, NameMapping nameMapping, Iterable parentFieldNames) { + Preconditions.checkArgument(schema.getType() == ARRAY, + "Cannot get array element id for non-array schema: %s", schema); + List names = Lists.newArrayList(parentFieldNames); + names.add("element"); + return getId(schema, ELEMENT_ID_PROP, nameMapping, names); + } + public static int getFieldId(Schema.Field field) { - Object id = field.getObjectProp(FIELD_ID_PROP); + Integer id = getFieldId(field, null, null); Preconditions.checkNotNull(id, "Missing expected '%s' property", FIELD_ID_PROP); + return id; + } - return toInt(id); + static Integer getFieldId(Schema.Field field, NameMapping nameMapping, Iterable parentFieldNames) { + Object id = field.getObjectProp(FIELD_ID_PROP); + if (id != null) { + return toInt(id); + } else { + Preconditions.checkState(nameMapping != null, "Field schema does not have id and name mapping not specified"); + List names = Lists.newArrayList(parentFieldNames); + names.add(field.name()); + MappedField mappedField = nameMapping.find(names); + if (mappedField != null) { + return mappedField.id(); + } + } + return null; } public static boolean hasFieldId(Schema.Field field) { diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index fe08a457e69c..a15356335e62 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Set; import org.apache.avro.Schema; -import org.apache.iceberg.mapping.MappedField; import org.apache.iceberg.mapping.NameMapping; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +56,7 @@ public Schema record(Schema record, List names, List fields) { List filteredFields = Lists.newArrayListWithExpectedSize(fields.size()); boolean hasChange = false; for (Schema.Field field : record.getFields()) { - Integer fieldId = fieldId(field); + Integer fieldId = AvroSchemaUtil.getFieldId(field, nameMapping, fieldNames()); if (fieldId == null) { // both the schema and the nameMapping does not have field id. We prune this field. continue; @@ -120,8 +119,8 @@ public Schema union(Schema union, List options) { public Schema array(Schema array, Schema element) { if (array.getLogicalType() instanceof LogicalMap || AvroSchemaUtil.isKeyValueSchema(array.getElementType())) { Schema keyValue = array.getElementType(); - Integer keyId = fieldId(keyValue.getField("key")); - Integer valueId = fieldId(keyValue.getField("value")); + Integer keyId = AvroSchemaUtil.getFieldId(keyValue.getField("key"), nameMapping, fieldNames()); + Integer valueId = AvroSchemaUtil.getFieldId(keyValue.getField("value"), nameMapping, fieldNames()); if (keyId == null || valueId == null) { if (keyId != null || valueId != null) { LOG.warn("Map schema {} should have both key and value ids set or both unset", array); @@ -146,7 +145,7 @@ public Schema array(Schema array, Schema element) { } } else { - Integer elementId = id(array, AvroSchemaUtil.ELEMENT_ID_PROP, "element"); + Integer elementId = AvroSchemaUtil.getElementId(array, nameMapping, fieldNames()); if (elementId == null) { return null; } @@ -167,8 +166,8 @@ public Schema array(Schema array, Schema element) { @Override public Schema map(Schema map, Schema value) { - Integer keyId = id(map, AvroSchemaUtil.KEY_ID_PROP, "key"); - Integer valueId = id(map, AvroSchemaUtil.VALUE_ID_PROP, "value"); + Integer keyId = AvroSchemaUtil.getKeyId(map, nameMapping, fieldNames()); + Integer valueId = AvroSchemaUtil.getValueId(map, nameMapping, fieldNames()); if (keyId == null || valueId == null) { if (keyId != null || valueId != null) { LOG.warn("Map schema {} should have both key and value ids set or both unset", map); @@ -229,40 +228,6 @@ public Schema primitive(Schema primitive) { return null; } - private Integer id(Schema schema, String propertyName, String mappedName) { - if (AvroSchemaUtil.hasProperty(schema, propertyName)) { - return AvroSchemaUtil.getId(schema, propertyName); - } else { - MappedField mappedField = mappedField(mappedName); - if (mappedField != null) { - return mappedField.id(); - } else { - return null; - } - } - } - - private Integer fieldId(Schema.Field field) { - if (AvroSchemaUtil.hasFieldId(field)) { - return AvroSchemaUtil.getFieldId(field); - } else { - MappedField mappedField = mappedField(field.name()); - if (mappedField != null) { - return mappedField.id(); - } else { - return null; - } - } - } - - private MappedField mappedField(String fieldName) { - Preconditions.checkState(nameMapping != null, - "Cannot find mapped field for field name %s. NameMapping is null", fieldName); - List fieldNames = Lists.newArrayList(fieldNames()); - fieldNames.add(fieldName); - return nameMapping.find(fieldNames); - } - private static Schema copyRecord(Schema record, List newFields) { Schema copy = Schema.createRecord(record.getName(), record.getDoc(), record.getNamespace(), record.isError(), newFields); From aa4c3592b92a35692e4545d9c2371cefbb209343 Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Fri, 4 Oct 2019 07:40:20 -0700 Subject: [PATCH 11/15] Use checkArgument instead of checkState --- .../src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index b047a301b538..6993ba13d9f5 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -216,7 +216,7 @@ static Integer getId(Schema schema, String propertyName, if (id != null) { return toInt(id); } else { - Preconditions.checkState(nameMapping != null, + Preconditions.checkArgument(nameMapping != null, "Schema does not have field id and name mapping not specified"); MappedField mappedField = nameMapping.find(names); if (mappedField != null) { @@ -286,7 +286,7 @@ static Integer getFieldId(Schema.Field field, NameMapping nameMapping, Iterable< if (id != null) { return toInt(id); } else { - Preconditions.checkState(nameMapping != null, "Field schema does not have id and name mapping not specified"); + Preconditions.checkArgument(nameMapping != null, "Field schema does not have id and name mapping not specified"); List names = Lists.newArrayList(parentFieldNames); names.add(field.name()); MappedField mappedField = nameMapping.find(names); From a6a63a9efb748ca107c1dc757ef361750bdb4c52 Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Fri, 4 Oct 2019 10:08:01 -0700 Subject: [PATCH 12/15] Added minor comment --- core/src/main/java/org/apache/iceberg/avro/PruneColumns.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index a15356335e62..c9b161d3e0a0 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -58,11 +58,13 @@ public Schema record(Schema record, List names, List fields) { for (Schema.Field field : record.getFields()) { Integer fieldId = AvroSchemaUtil.getFieldId(field, nameMapping, fieldNames()); if (fieldId == null) { - // both the schema and the nameMapping does not have field id. We prune this field. + // Both the schema and the nameMapping does not have field id. We prune this field. continue; } if (!AvroSchemaUtil.hasFieldId(field)) { + // fieldId was resolved from nameMapping, we updated hasChange + // flag to make sure a new field is created with the field id hasChange = true; } From 8cf304b0f65a03b5ef4c07a6ee4e99fb4f9a3cfa Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Wed, 9 Oct 2019 10:40:39 -0700 Subject: [PATCH 13/15] Address comments --- .../apache/iceberg/avro/AvroSchemaUtil.java | 17 ++++++------- .../org/apache/iceberg/avro/PruneColumns.java | 25 +++++++++++++------ .../iceberg/avro/TestAvroNameMapping.java | 19 +++++++------- 3 files changed, 33 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index 6993ba13d9f5..2d4943111b2b 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -199,15 +199,13 @@ static Schema createProjectionMap(String recordName, return LogicalMap.get().addToSchema(Schema.createArray(keyValueRecord)); } - - static Integer getId(Schema schema, String propertyName) { + private static Integer getId(Schema schema, String propertyName) { Integer id = getId(schema, propertyName, null, null); Preconditions.checkNotNull(id, "Missing expected '%s' property", propertyName); return id; } - static Integer getId(Schema schema, String propertyName, - NameMapping nameMapping, List names) { + private static Integer getId(Schema schema, String propertyName, NameMapping nameMapping, List names) { if (schema.getType() == UNION) { return getId(fromOption(schema), propertyName, nameMapping, names); } @@ -215,15 +213,14 @@ static Integer getId(Schema schema, String propertyName, Object id = schema.getObjectProp(propertyName); if (id != null) { return toInt(id); - } else { - Preconditions.checkArgument(nameMapping != null, - "Schema does not have field id and name mapping not specified"); + } else if (nameMapping != null) { MappedField mappedField = nameMapping.find(names); if (mappedField != null) { return mappedField.id(); } - return null; } + + return null; } static boolean hasProperty(Schema schema, String propertyName) { @@ -285,8 +282,7 @@ static Integer getFieldId(Schema.Field field, NameMapping nameMapping, Iterable< Object id = field.getObjectProp(FIELD_ID_PROP); if (id != null) { return toInt(id); - } else { - Preconditions.checkArgument(nameMapping != null, "Field schema does not have id and name mapping not specified"); + } else if (nameMapping != null) { List names = Lists.newArrayList(parentFieldNames); names.add(field.name()); MappedField mappedField = nameMapping.find(names); @@ -294,6 +290,7 @@ static Integer getFieldId(Schema.Field field, NameMapping nameMapping, Iterable< return mappedField.id(); } } + return null; } diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index c9b161d3e0a0..0fd52124d7e3 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import org.apache.avro.Schema; +import org.apache.avro.SchemaNormalization; import org.apache.iceberg.mapping.NameMapping; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +79,7 @@ public Schema record(Schema record, List names, List fields) { if (selectedIds.contains(fieldId)) { filteredFields.add(copyField(field, field.schema(), fieldId)); } else if (fieldSchema != null) { + hasChange = true; filteredFields.add(copyField(field, fieldSchema, fieldId)); } } @@ -134,13 +136,16 @@ public Schema array(Schema array, Schema element) { if (selectedIds.contains(keyId) || selectedIds.contains(valueId)) { return complexMapWithIds(array, keyId, valueId); } else if (element != null) { - if (keyValue.getField("key").schema() != element.getField("key").schema() || - keyValue.getField("value").schema() != element.getField("value").schema()) { - // key schemas can be different if new field ids were assigned to them - // the value must be a projection - return AvroSchemaUtil.createMap( - keyId, element.getField("key").schema(), - valueId, element.getField("value").schema()); + Schema keyProjection = element.getField("key").schema(); + Schema valueProjection = element.getField("value").schema(); + // key schemas can be different if new field ids were assigned to them + if (keyValue.getField("key").schema() != keyProjection) { + Preconditions.checkState( + SchemaNormalization.parsingFingerprint64(keyValue.getField("key").schema()) == + SchemaNormalization.parsingFingerprint64(keyProjection), "Map keys should not be projected"); + return AvroSchemaUtil.createMap(keyId, keyProjection, valueId, valueProjection); + } else if (keyValue.getField("value").schema() != valueProjection) { + return AvroSchemaUtil.createMap(keyId, keyProjection, valueId, valueProjection); } else { return complexMapWithIds(array, keyId, valueId); } @@ -249,7 +254,11 @@ private static Schema.Field copyField(Schema.Field field, Schema newSchema, Inte copy.addProp(prop.getKey(), prop.getValue()); } - if (!AvroSchemaUtil.hasFieldId(field)) { + if (AvroSchemaUtil.hasFieldId(field)) { + int existingFieldId = AvroSchemaUtil.getFieldId(field); + Preconditions.checkArgument(existingFieldId == fieldId, + "Existing field does match with that fetched from name mapping"); + } else { // field may not have a fieldId if the fieldId was fetched from nameMapping copy.addProp(AvroSchemaUtil.FIELD_ID_PROP, fieldId); } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java index 082938ba9e02..ff5b7e90b0a3 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Set; +import org.apache.avro.SchemaNormalization; import org.apache.avro.generic.GenericData; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.Schema; @@ -241,22 +242,20 @@ private void check(Schema writeSchema, Schema readSchema, NameMapping nameMappin org.apache.avro.Schema expected = project(writeSchema, readSchema); org.apache.avro.Schema actual = projectWithNameMapping(writeSchema, readSchema, nameMapping); - // projected/read schema built using external mapping should match - // projected/read schema built with file schema having field ids - // `BuildAvroProjection` can skip adding fields ids in some cases. // e.g when creating a map if the value is modified. This leads to // test failures. For now I've removed ids to perform equality testing. - Assert.assertEquals(removeIds(expected), removeIds(actual)); - // Projected/read schema built using external mapping will always match expected Iceberg schema - Assert.assertEquals(removeIds(actual), removeIds(AvroSchemaUtil.convert(readSchema, "table"))); + Assert.assertEquals( + "Read schema built using external mapping should match read schema built with file schema having field ids", + SchemaNormalization.parsingFingerprint64(expected), + SchemaNormalization.parsingFingerprint64(actual)); + Assert.assertEquals( + "Projected/read schema built using external mapping will always match expected Iceberg schema", + SchemaNormalization.parsingFingerprint64(AvroSchemaUtil.convert(readSchema, "table")), + SchemaNormalization.parsingFingerprint64(actual)); } private static org.apache.avro.Schema removeIds(Schema schema) { return AvroSchemaVisitor.visit(AvroSchemaUtil.convert(schema.asStruct(), "table"), new RemoveIds()); } - - private static org.apache.avro.Schema removeIds(org.apache.avro.Schema schema) { - return AvroSchemaVisitor.visit(schema, new RemoveIds()); - } } From aea7a30f67cc1ce4fdc45fd2ed5c6898fcf884c2 Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Mon, 14 Oct 2019 14:56:26 -0700 Subject: [PATCH 14/15] * Modify tests to write and read avro records without ids, using NameMapping, * Change the read schema to rename a field uniquely if that field is not projected by NameMapping --- .gitignore | 6 + .../iceberg/avro/BuildAvroProjection.java | 7 +- .../iceberg/avro/TestAvroNameMapping.java | 270 +++++++++--------- 3 files changed, 143 insertions(+), 140 deletions(-) diff --git a/.gitignore b/.gitignore index c7a0af84e8c7..4bdf25c0d5d6 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,9 @@ out # web site build site/site +# vscode +.project + __pycache__/ *.py[cod] .eggs/ @@ -25,3 +28,6 @@ sdist/ coverage.xml .pytest_cache/ spark/tmp/ + +# gradle +gradle/ diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index c4ae1678c9ef..8d18d96ce20d 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -94,9 +94,10 @@ public Schema record(Schema record, List names, Iterable s } else { Preconditions.checkArgument(field.isOptional(), "Missing required field: %s", field.name()); - // create a field that will be defaulted to null + // Create a field that will be defaulted to null. We assign a unique suffix to the field + // to make sure that even if records in the file have the field it is not projected. Schema.Field newField = new Schema.Field( - field.name(), + field.name() + "_r" + field.fieldId(), AvroSchemaUtil.toOption(AvroSchemaUtil.convert(field.type())), null, JsonProperties.NULL_VALUE); newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, field.fieldId()); updatedFields.add(newField); @@ -115,7 +116,7 @@ public Schema record(Schema record, List names, Iterable s public Schema.Field field(Schema.Field field, Supplier fieldResult) { Types.StructType struct = current.asNestedType().asStructType(); int fieldId = AvroSchemaUtil.getFieldId(field); - Types.NestedField expectedField = struct.field(fieldId); // TODO: what if there are no ids? + Types.NestedField expectedField = struct.field(fieldId); // if the field isn't present, it was not selected if (expectedField == null) { diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java index ff5b7e90b0a3..2cc5408fe9bf 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java @@ -19,133 +19,106 @@ package org.apache.iceberg.avro; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import java.io.File; import java.io.IOException; -import java.util.Collections; -import java.util.Set; -import org.apache.avro.SchemaNormalization; +import java.util.List; +import java.util.Map; +import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.DatumWriter; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.mapping.MappedField; import org.apache.iceberg.mapping.MappedFields; import org.apache.iceberg.mapping.MappingUtil; import org.apache.iceberg.mapping.NameMapping; -import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.junit.Assert; import org.junit.Test; -public class TestAvroNameMapping extends TestAvroReadProjection { - @Override - protected GenericData.Record writeAndRead(String desc, - Schema writeSchema, - Schema readSchema, - GenericData.Record inputRecord) throws IOException { - - // Use all existing TestAvroReadProjection tests to verify that - // we get the same projected (Avro) read schema whether we use - // NameMapping together with file schema without field-ids or we - // use a file schema having field-ids - GenericData.Record record = super.writeAndRead(desc, writeSchema, readSchema, inputRecord); - org.apache.avro.Schema expected = record.getSchema(); - org.apache.avro.Schema actual = projectWithNameMapping(writeSchema, readSchema, MappingUtil.create(writeSchema)); - Assert.assertEquals(expected, actual); - return record; - } +import static org.apache.avro.generic.GenericData.Record; +public class TestAvroNameMapping extends TestAvroReadProjection { @Test - public void testNameMappingProjections() { - Schema fileSchema = new Schema( + public void testMapProjections() throws IOException { + Schema writeSchema = new Schema( Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional(5, "locations", Types.MapType.ofOptional(6, 7, + Types.NestedField.optional(5, "location", Types.MapType.ofOptional(6, 7, Types.StringType.get(), Types.StructType.of( Types.NestedField.required(1, "lat", Types.FloatType.get()), - Types.NestedField.required(2, "long", Types.FloatType.get()) + Types.NestedField.optional(2, "long", Types.FloatType.get()) ) ))); - // Table mapping does not project `locations` map + Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); + record.put("id", 34L); + Record location = new Record(AvroSchemaUtil.fromOption( + AvroSchemaUtil.fromOption(record.getSchema().getField("location").schema()) + .getValueType())); + location.put("lat", 52.995143f); + location.put("long", -1.539054f); + record.put("location", ImmutableMap.of("l1", location)); + + // Table mapping does not project `location` map NameMapping nameMapping = MappingUtil.create(new Schema( Types.NestedField.required(0, "id", Types.LongType.get()))); - // Table read schema projects `locations` map's value - Schema readSchema = new Schema( + Schema readSchema = writeSchema; + + Record projected = writeAndRead(writeSchema, readSchema, record, nameMapping); + // field id 5 comes from read schema + Assert.assertNotNull("Field missing from table mapping is renamed", projected.getSchema().getField("location_r5")); + Assert.assertNull("location field should not be read", projected.get("location_r5")); + Assert.assertEquals(34L, projected.get("id")); + + // Table mapping partially project `location` map value + nameMapping = MappingUtil.create(new Schema( Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional(5, "locations", Types.MapType.ofOptional(6, 7, + Types.NestedField.optional(5, "location", Types.MapType.ofOptional(6, 7, Types.StringType.get(), Types.StructType.of( - Types.NestedField.required(2, "long", Types.FloatType.get()) - ) - ))); + Types.NestedField.required(1, "lat", Types.FloatType.get())))))); - check(fileSchema, readSchema, nameMapping); + projected = writeAndRead(writeSchema, readSchema, record, nameMapping); + Record projectedL1 = ((Map) projected.get("location")).get("l1"); + Assert.assertNotNull("Field missing from table mapping is renamed", projectedL1.getSchema().getField("long_r2")); + Assert.assertNull("location.value.long, should not be read", projectedL1.get("long_r2")); } @Test - public void testMissingFields() { - Schema fileSchema = new Schema( + public void testMissingRequiredFields() { + Schema writeSchema = new Schema( Types.NestedField.required(19, "x", Types.IntegerType.get()), Types.NestedField.optional(18, "y", Types.IntegerType.get())); + Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); + record.put("x", 1); + record.put("y", 2); + // table mapping not projecting a required field 'x' NameMapping nameMapping = MappingUtil.create(new Schema( Types.NestedField.optional(18, "y", Types.IntegerType.get()))); - Schema readSchema = fileSchema; + + Schema readSchema = writeSchema; AssertHelpers.assertThrows("Missing required field in nameMapping", IllegalArgumentException.class, "Missing required field: x", // In this case, pruneColumns result is an empty record - () -> projectWithNameMapping(fileSchema, readSchema, nameMapping)); + () -> writeAndRead(writeSchema, readSchema, record, nameMapping)); } @Test - public void testComplexMapKeys() { - Schema fileSchema = new Schema( - Types.NestedField.required(5, "locations", Types.MapType.ofOptional(6, 7, - Types.StructType.of( - Types.NestedField.required(3, "k1", Types.StringType.get()), - Types.NestedField.optional(4, "k2", Types.StringType.get()) - ), - Types.StructType.of( - Types.NestedField.required(1, "lat", Types.FloatType.get()), - Types.NestedField.optional(2, "long", Types.FloatType.get()) - ) - ))); - - // project a subset of the map's value columns in NameMapping - NameMapping nameMapping = MappingUtil.create(new Schema( - Types.NestedField.required(5, "locations", Types.MapType.ofOptional(6, 7, - Types.StructType.of( - Types.NestedField.required(3, "k1", Types.StringType.get()), - Types.NestedField.optional(4, "k2", Types.StringType.get()) - ), - Types.StructType.of( - Types.NestedField.required(1, "lat", Types.FloatType.get()) - ) - )))); - - Schema readSchema = new Schema( - Types.NestedField.required(5, "locations", Types.MapType.ofOptional(6, 7, - Types.StructType.of( - Types.NestedField.required(3, "k1", Types.StringType.get()), - Types.NestedField.optional(4, "k2", Types.StringType.get()) - ), - Types.StructType.of( - Types.NestedField.required(1, "lat", Types.FloatType.get()), - Types.NestedField.optional(2, "long", Types.FloatType.get()) - ) - ))); - - check(fileSchema, readSchema, nameMapping); - } - - @Test - public void testArrayProjections() { + public void testArrayProjections() throws Exception { Schema writeSchema = new Schema( Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional(22, "points", + Types.NestedField.optional(22, "point", Types.ListType.ofOptional(21, Types.StructType.of( Types.NestedField.required(19, "x", Types.IntegerType.get()), Types.NestedField.optional(18, "y", Types.IntegerType.get()) @@ -153,109 +126,132 @@ public void testArrayProjections() { ) ); + Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); + record.put("id", 34L); + Record pointRecord = new Record(AvroSchemaUtil.fromOption( + AvroSchemaUtil.fromOption(record.getSchema().getField("point").schema()).getElementType())); + pointRecord.put("x", 1); + pointRecord.put("y", 2); + record.put("point", ImmutableList.of(pointRecord)); + NameMapping nameMapping = MappingUtil.create(new Schema( - // Optional array field missing. Will be filled in - // using default values using read schema + // Optional array field missing. Types.NestedField.required(0, "id", Types.LongType.get()))); Schema readSchema = new Schema( Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional(22, "points", + Types.NestedField.optional(22, "point", Types.ListType.ofOptional(21, Types.StructType.of( - Types.NestedField.required(19, "x", Types.IntegerType.get()) + Types.NestedField.required(19, "x", Types.IntegerType.get()), + Types.NestedField.optional(18, "y", Types.IntegerType.get()) )) ) ); - check(writeSchema, readSchema, nameMapping); + Record projected = writeAndRead(writeSchema, readSchema, record, nameMapping); + Assert.assertNotNull("Field missing from table mapping is renamed", projected.getSchema().getField("point_r22")); + Assert.assertNull("point field is not projected", projected.get("point_r22")); + Assert.assertEquals(34L, projected.get("id")); - // points array is partially projected + // point array is partially projected nameMapping = MappingUtil.create(new Schema( Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional(22, "points", + Types.NestedField.optional(22, "point", Types.ListType.ofOptional(21, Types.StructType.of( Types.NestedField.required(19, "x", Types.IntegerType.get()))) ) )); - check(writeSchema, readSchema, nameMapping); + projected = writeAndRead(writeSchema, readSchema, record, nameMapping); + Record point = ((List) projected.get("point")).get(0); + + Assert.assertNotNull("Field missing from table mapping is renamed", point.getSchema().getField("y_r18")); + Assert.assertEquals("point.x is projected", 1, point.get("x")); + Assert.assertNull("point.y is not projected", point.get("y_r18")); + Assert.assertEquals(34L, projected.get("id")); } @Test - public void testAliases() { - Schema fileSchema = new Schema( + public void testAliases() throws IOException { + Schema writeSchema = new Schema( Types.NestedField.optional(22, "points", Types.ListType.ofOptional(21, Types.StructType.of( - Types.NestedField.required(19, "x", Types.IntegerType.get()) - )) - ) - ); + Types.NestedField.required(19, "x", Types.IntegerType.get()))))); - NameMapping nameMapping = NameMapping.of(MappedFields.of( - MappedField.of(22, "points", MappedFields.of( - MappedField.of(19, Lists.newArrayList("x", "y", "z")) - )))); + Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); + Record pointRecord = new Record(AvroSchemaUtil.fromOption( + AvroSchemaUtil.fromOption(record.getSchema().getField("points").schema()).getElementType())); + pointRecord.put("x", 1); + record.put("points", ImmutableList.of(pointRecord)); + NameMapping nameMapping = NameMapping.of( + MappedFields.of( + MappedField.of(22, "points", MappedFields.of( + MappedField.of(21, "element", MappedFields.of( + // Name mapping has aliases for field x + MappedField.of(19, Lists.newArrayList("x", "y", "z")))))))); Schema readSchema = new Schema( Types.NestedField.optional(22, "points", Types.ListType.ofOptional(21, Types.StructType.of( // x renamed to y - Types.NestedField.required(19, "y", Types.IntegerType.get()) - )) - ) - ); + Types.NestedField.required(19, "y", Types.IntegerType.get()))))); - check(fileSchema, readSchema, nameMapping); + Record projected = writeAndRead(writeSchema, readSchema, record, nameMapping); + Assert.assertEquals("x is read as y", 1, ((List) projected.get("points")).get(0).get("y")); readSchema = new Schema( Types.NestedField.optional(22, "points", Types.ListType.ofOptional(21, Types.StructType.of( // x renamed to z - Types.NestedField.required(19, "z", Types.IntegerType.get()) - )) - ) - ); + Types.NestedField.required(19, "z", Types.IntegerType.get()))))); - check(fileSchema, readSchema, nameMapping); + projected = writeAndRead(writeSchema, readSchema, record, nameMapping); + Assert.assertEquals("x is read as z", 1, ((List) projected.get("points")).get(0).get("z")); } + @Override + protected Record writeAndRead(String desc, + Schema writeSchema, + Schema readSchema, + Record inputRecord) throws IOException { - private static org.apache.avro.Schema project(Schema writeSchema, Schema readSchema) { - // Build a read schema when file schema has field ids - org.apache.avro.Schema avroFileSchema = AvroSchemaUtil.convert(writeSchema.asStruct(), "table"); - Set projectedIds = TypeUtil.getProjectedIds(readSchema); - org.apache.avro.Schema prunedFileSchema = AvroSchemaUtil.pruneColumns(avroFileSchema, projectedIds, null); - return AvroSchemaUtil.buildAvroProjection(prunedFileSchema, readSchema, Collections.emptyMap()); + // Use all existing TestAvroReadProjection tests to verify that + // we get the same projected Avro record whether we use + // NameMapping together with file schema without field-ids or we + // use a file schema having field-ids + Record record = super.writeAndRead(desc, writeSchema, readSchema, inputRecord); + Record projectedWithNameMapping = writeAndRead( + writeSchema, readSchema, inputRecord, MappingUtil.create(writeSchema)); + Assert.assertEquals(record, projectedWithNameMapping); + return record; } - private static org.apache.avro.Schema projectWithNameMapping( - Schema writeSchema, Schema readSchema, NameMapping nameMapping) { - // Build a read schema when file schema does not have field ids . The field ids are provided by `nameMapping` - org.apache.avro.Schema avroFileSchema = removeIds(writeSchema); - Set projectedIds = TypeUtil.getProjectedIds(readSchema); - org.apache.avro.Schema prunedFileSchema = AvroSchemaUtil.pruneColumns(avroFileSchema, projectedIds, nameMapping); - return AvroSchemaUtil.buildAvroProjection(prunedFileSchema, readSchema, Collections.emptyMap()); - } - private void check(Schema writeSchema, Schema readSchema, NameMapping nameMapping) { - org.apache.avro.Schema expected = project(writeSchema, readSchema); - org.apache.avro.Schema actual = projectWithNameMapping(writeSchema, readSchema, nameMapping); - - // `BuildAvroProjection` can skip adding fields ids in some cases. - // e.g when creating a map if the value is modified. This leads to - // test failures. For now I've removed ids to perform equality testing. - Assert.assertEquals( - "Read schema built using external mapping should match read schema built with file schema having field ids", - SchemaNormalization.parsingFingerprint64(expected), - SchemaNormalization.parsingFingerprint64(actual)); - Assert.assertEquals( - "Projected/read schema built using external mapping will always match expected Iceberg schema", - SchemaNormalization.parsingFingerprint64(AvroSchemaUtil.convert(readSchema, "table")), - SchemaNormalization.parsingFingerprint64(actual)); + private Record writeAndRead(Schema writeSchema, + Schema readSchema, + Record record, + NameMapping nameMapping) throws IOException { + + File file = temp.newFile(); + // Write without file ids + org.apache.avro.Schema writeAvroSchema = removeIds(writeSchema); + DatumWriter datumWriter = new GenericDatumWriter<>(writeAvroSchema); + try (DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter)) { + dataFileWriter.create(writeAvroSchema, file); + dataFileWriter.append(record); + } + + Iterable records = Avro.read(Files.localInput(file)) + .project(readSchema) + .nameMapping(nameMapping) + .build(); + + return Iterables.getOnlyElement(records); } private static org.apache.avro.Schema removeIds(Schema schema) { return AvroSchemaVisitor.visit(AvroSchemaUtil.convert(schema.asStruct(), "table"), new RemoveIds()); } + } From 0b4525bcc8bec1964bef0cf392eddb97cd1be62e Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Mon, 21 Oct 2019 19:04:46 -0700 Subject: [PATCH 15/15] Address review comments --- .gitignore | 6 ------ .../apache/iceberg/avro/TestAvroNameMapping.java | 14 ++------------ 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 4bdf25c0d5d6..c7a0af84e8c7 100644 --- a/.gitignore +++ b/.gitignore @@ -10,9 +10,6 @@ out # web site build site/site -# vscode -.project - __pycache__/ *.py[cod] .eggs/ @@ -28,6 +25,3 @@ sdist/ coverage.xml .pytest_cache/ spark/tmp/ - -# gradle -gradle/ diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java index 2cc5408fe9bf..2ad70a125ce5 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java @@ -106,7 +106,6 @@ public void testMissingRequiredFields() { NameMapping nameMapping = MappingUtil.create(new Schema( Types.NestedField.optional(18, "y", Types.IntegerType.get()))); - Schema readSchema = writeSchema; AssertHelpers.assertThrows("Missing required field in nameMapping", IllegalArgumentException.class, "Missing required field: x", @@ -138,15 +137,7 @@ public void testArrayProjections() throws Exception { // Optional array field missing. Types.NestedField.required(0, "id", Types.LongType.get()))); - Schema readSchema = new Schema( - Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional(22, "point", - Types.ListType.ofOptional(21, Types.StructType.of( - Types.NestedField.required(19, "x", Types.IntegerType.get()), - Types.NestedField.optional(18, "y", Types.IntegerType.get()) - )) - ) - ); + Schema readSchema = writeSchema; Record projected = writeAndRead(writeSchema, readSchema, record, nameMapping); Assert.assertNotNull("Field missing from table mapping is renamed", projected.getSchema().getField("point_r22")); @@ -188,8 +179,7 @@ public void testAliases() throws IOException { MappedFields.of( MappedField.of(22, "points", MappedFields.of( MappedField.of(21, "element", MappedFields.of( - // Name mapping has aliases for field x - MappedField.of(19, Lists.newArrayList("x", "y", "z")))))))); + MappedField.of(19, Lists.newArrayList("x")))))))); Schema readSchema = new Schema( Types.NestedField.optional(22, "points",