From 7212fa2a0e65c614f33d3e8735bad2a6e546d9c1 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 1 Nov 2019 12:36:18 -0700 Subject: [PATCH 1/8] Fix Parquet with special characters in field names. --- .../java/org/apache/iceberg/avro/AvroSchemaUtil.java | 7 +++++++ .../apache/iceberg/parquet/TypeToMessageType.java | 12 +++++++----- .../iceberg/spark/data/TestSparkParquetWriter.java | 3 ++- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index a34dcf553beb..d55ed5a4aa5c 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -347,6 +347,13 @@ static Schema.Field copyField(Schema.Field field, Schema newSchema, String newNa return copy; } + public static String makeCompatibleName(String name) { + if (!validAvroName(name)) { + return sanitize(name); + } + return name; + } + static boolean validAvroName(String name) { int length = name.length(); Preconditions.checkArgument(length > 0, "Empty name"); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java index 7581520c354e..275710492bc5 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java @@ -20,6 +20,7 @@ package org.apache.iceberg.parquet; import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.types.Type.NestedType; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.TypeUtil; @@ -58,7 +59,7 @@ public MessageType convert(Schema schema, String name) { builder.addField(field(field)); } - return builder.named(name); + return builder.named(AvroSchemaUtil.makeCompatibleName(name)); } public GroupType struct(StructType struct, Type.Repetition repetition, int id, String name) { @@ -68,7 +69,7 @@ public GroupType struct(StructType struct, Type.Repetition repetition, int id, S builder.addField(field(field)); } - return builder.id(id).named(name); + return builder.id(id).named(AvroSchemaUtil.makeCompatibleName(name)); } public Type field(NestedField field) { @@ -98,7 +99,7 @@ public GroupType list(ListType list, Type.Repetition repetition, int id, String return Types.list(repetition) .element(field(elementField)) .id(id) - .named(name); + .named(AvroSchemaUtil.makeCompatibleName(name)); } public GroupType map(MapType map, Type.Repetition repetition, int id, String name) { @@ -108,10 +109,11 @@ public GroupType map(MapType map, Type.Repetition repetition, int id, String nam .key(field(keyField)) .value(field(valueField)) .id(id) - .named(name); + .named(AvroSchemaUtil.makeCompatibleName(name)); } - public Type primitive(PrimitiveType primitive, Type.Repetition repetition, int id, String name) { + public Type primitive(PrimitiveType primitive, Type.Repetition repetition, int id, String originalName) { + String name = AvroSchemaUtil.makeCompatibleName(originalName); switch (primitive.typeId()) { case BOOLEAN: return Types.primitive(BOOLEAN, repetition).id(id).named(name); diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java index 97fa11e80585..00f95f382a15 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java @@ -62,7 +62,8 @@ public class TestSparkParquetWriter { optional(19, "renovate", Types.MapType.ofRequired(20, 21, Types.StringType.get(), Types.StructType.of( optional(22, "jumpy", Types.DoubleType.get()), - required(23, "koala", Types.IntegerType.get()) + required(23, "koala", Types.IntegerType.get()), + required(24, "couch rope", Types.IntegerType.get()) ))), optional(2, "slide", Types.StringType.get()) ); From 493158fa645ddab23bbf3ba4264b368ff8836d0f Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 1 Nov 2019 13:43:46 -0700 Subject: [PATCH 2/8] More tests to validate special characters in field names. This also fixes Avro's special character handling. --- .../java/org/apache/iceberg/avro/Avro.java | 23 +++++++++++++-- .../iceberg/avro/BuildAvroProjection.java | 4 +-- .../apache/iceberg/data/avro/DataReader.java | 28 +++++++++++-------- .../iceberg/data/avro/IcebergDecoder.java | 4 ++- .../iceberg/data/TestReadProjection.java | 26 +++++++++++++++++ 5 files changed, 68 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 760b80588cd5..d7341291da33 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.Locale; import java.util.Map; +import java.util.function.BiFunction; import java.util.function.Function; import org.apache.avro.Conversions; import org.apache.avro.LogicalTypes; @@ -174,7 +175,9 @@ public static class ReadBuilder { private NameMapping nameMapping; private boolean reuseContainers = false; private org.apache.iceberg.Schema schema = null; - private Function> createReaderFunc = readSchema -> { + private Function> createReaderFunc = null; + private BiFunction> createReaderBiFunc = null; + private final Function> defaultCreateReaderFunc = readSchema -> { GenericAvroReader reader = new GenericAvroReader<>(readSchema); reader.setClassLoader(defaultLoader); return reader; @@ -188,10 +191,17 @@ private ReadBuilder(InputFile file) { } public ReadBuilder createReaderFunc(Function> readerFunction) { + Preconditions.checkState(createReaderBiFunc == null, "Cannot set multiple createReaderFunc"); this.createReaderFunc = readerFunction; return this; } + public ReadBuilder createReaderFunc(BiFunction> readerFunction) { + Preconditions.checkState(createReaderFunc == null, "Cannot set multiple createReaderFunc"); + this.createReaderBiFunc = readerFunction; + return this; + } + /** * Restricts the read to the given range: [start, end = start + length). * @@ -232,8 +242,17 @@ public ReadBuilder nameMapping(NameMapping newNameMapping) { public AvroIterable build() { Preconditions.checkNotNull(schema, "Schema is required"); + Function> readerFunc; + if (createReaderBiFunc != null) { + readerFunc = avroSchema -> createReaderBiFunc.apply(schema, avroSchema); + } else if (createReaderFunc != null) { + readerFunc = createReaderFunc; + } else { + readerFunc = defaultCreateReaderFunc; + } + return new AvroIterable<>(file, - new ProjectionDatumReader<>(createReaderFunc, schema, renames, nameMapping), + new ProjectionDatumReader<>(readerFunc, schema, renames, nameMapping), start, length, reuseContainers); } } diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index 8d18d96ce20d..35b8981ca0fe 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -87,7 +87,7 @@ public Schema record(Schema record, List names, Iterable s hasChange = true; } - Schema.Field avroField = updateMap.get(field.name()); + Schema.Field avroField = updateMap.get(AvroSchemaUtil.makeCompatibleName(field.name())); if (avroField != null) { updatedFields.add(avroField); @@ -131,7 +131,7 @@ public Schema.Field field(Schema.Field field, Supplier fieldResult) { if (schema != field.schema() || !expectedName.equals(field.name())) { // add an alias for the field - return AvroSchemaUtil.copyField(field, schema, expectedName); + return AvroSchemaUtil.copyField(field, schema, AvroSchemaUtil.makeCompatibleName(expectedName)); } else { // always copy because fields can't be reused return AvroSchemaUtil.copyField(field, field.schema(), field.name()); diff --git a/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java b/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java index 122173d983f4..2964b31f5a7b 100644 --- a/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java +++ b/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java @@ -32,19 +32,21 @@ import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.ResolvingDecoder; import org.apache.iceberg.avro.AvroSchemaUtil; -import org.apache.iceberg.avro.AvroSchemaVisitor; +import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor; import org.apache.iceberg.avro.LogicalMap; import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; public class DataReader implements DatumReader { private static final ThreadLocal>> DECODER_CACHES = ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap()); - public static DataReader create(Schema readSchema) { - return new DataReader<>(readSchema); + public static DataReader create(org.apache.iceberg.Schema expectedSchema, Schema readSchema) { + return new DataReader<>(expectedSchema, readSchema); } private final Schema readSchema; @@ -52,9 +54,9 @@ public static DataReader create(Schema readSchema) { private Schema fileSchema = null; @SuppressWarnings("unchecked") - private DataReader(Schema readSchema) { + private DataReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema) { this.readSchema = readSchema; - this.reader = (ValueReader) AvroSchemaVisitor.visit(readSchema, new ReadBuilder()); + this.reader = (ValueReader) AvroSchemaWithTypeVisitor.visit(expectedSchema, readSchema, new ReadBuilder()); } @Override @@ -94,22 +96,24 @@ private ResolvingDecoder newResolver() { } } - private static class ReadBuilder extends AvroSchemaVisitor> { + private static class ReadBuilder extends AvroSchemaWithTypeVisitor> { + private ReadBuilder() { } @Override - public ValueReader record(Schema record, List names, List> fields) { - return GenericReaders.struct(AvroSchemaUtil.convert(record).asStructType(), fields); + public ValueReader record(Types.StructType struct, Schema record, + List names, List> fields) { + return GenericReaders.struct(struct, fields); } @Override - public ValueReader union(Schema union, List> options) { + public ValueReader union(Type ignored, Schema union, List> options) { return ValueReaders.union(options); } @Override - public ValueReader array(Schema array, ValueReader elementReader) { + public ValueReader array(Types.ListType ignored, Schema array, ValueReader elementReader) { if (array.getLogicalType() instanceof LogicalMap) { ValueReaders.StructReader keyValueReader = (ValueReaders.StructReader) elementReader; ValueReader keyReader = keyValueReader.reader(0); @@ -122,12 +126,12 @@ public ValueReader array(Schema array, ValueReader elementReader) { } @Override - public ValueReader map(Schema map, ValueReader valueReader) { + public ValueReader map(Types.MapType ignored, Schema map, ValueReader valueReader) { return ValueReaders.map(ValueReaders.strings(), valueReader); } @Override - public ValueReader primitive(Schema primitive) { + public ValueReader primitive(Type.PrimitiveType ignored, Schema primitive) { LogicalType logicalType = primitive.getLogicalType(); if (logicalType != null) { switch (logicalType.getName()) { diff --git a/data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java b/data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java index 97a50e9c29e7..42b433f53185 100644 --- a/data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java +++ b/data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java @@ -166,7 +166,9 @@ private static class RawDecoder extends MessageDecoder.BaseDecoder { * @param writeSchema the schema used to decode buffers */ private RawDecoder(org.apache.iceberg.Schema readSchema, org.apache.avro.Schema writeSchema) { - this.reader = new ProjectionDatumReader<>(DataReader::create, readSchema, ImmutableMap.of(), null); + this.reader = new ProjectionDatumReader<>( + avroSchema -> DataReader.create(readSchema, avroSchema), + readSchema, ImmutableMap.of(), null); this.reader.setSchema(writeSchema); } diff --git a/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java b/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java index 54b76d8c8647..e22b4b7c6089 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java +++ b/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java @@ -63,6 +63,32 @@ public void testFullProjection() throws Exception { Assert.assertTrue("Should contain the correct data value", cmp == 0); } + @Test + public void testSpecialCharacterProjection() throws Exception { + Schema schema = new Schema( + Types.NestedField.required(0, "user id", Types.LongType.get()), + Types.NestedField.optional(1, "data%0", Types.StringType.get()) + ); + + Record record = GenericRecord.create(schema.asStruct()); + record.setField("user id", 34L); + record.setField("data%0", "test"); + + Record full = writeAndRead("special_chars", schema, schema, record); + + Assert.assertEquals("Should contain the correct id value", 34L, (long) full.getField("user id")); + Assert.assertEquals("Should contain the correct data value", + 0, + Comparators.charSequences().compare("test", (CharSequence) full.getField("data%0"))); + + Record projected = writeAndRead("full_projection", schema, schema.select("data%0"), record); + + Assert.assertNull("Should not contain id value", projected.getField("user id")); + Assert.assertEquals("Should contain the correct data value", + 0, + Comparators.charSequences().compare("test", (CharSequence) projected.getField("data%0"))); + } + @Test public void testReorderedFullProjection() throws Exception { Schema schema = new Schema( From f93823b20a22a29fee478d2e3eb547d2a7a33492 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 1 Nov 2019 13:46:06 -0700 Subject: [PATCH 3/8] Add missing Visitor. --- .../avro/AvroSchemaWithTypeVisitor.java | 109 ++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java new file mode 100644 index 000000000000..e28c4eb7d349 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import java.util.Deque; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +public abstract class AvroSchemaWithTypeVisitor { + public static T visit(org.apache.iceberg.Schema iSchema, Schema schema, AvroSchemaWithTypeVisitor visitor) { + return visit(iSchema.asStruct(), schema, visitor); + } + + public static T visit(Type iType, Schema schema, AvroSchemaWithTypeVisitor visitor) { + switch (schema.getType()) { + case RECORD: + Types.StructType struct = iType != null ? iType.asStructType() : null; + + // check to make sure this hasn't been visited before + String name = schema.getFullName(); + Preconditions.checkState(!visitor.recordLevels.contains(name), + "Cannot process recursive Avro record %s", name); + + visitor.recordLevels.push(name); + + List fields = schema.getFields(); + List names = Lists.newArrayListWithExpectedSize(fields.size()); + List results = Lists.newArrayListWithExpectedSize(fields.size()); + for (Schema.Field field : schema.getFields()) { + int fieldId = AvroSchemaUtil.getFieldId(field); + Types.NestedField iField = struct != null ? struct.field(fieldId) : null; + names.add(field.name()); + results.add(visit(iField != null ? iField.type() : null, field.schema(), visitor)); + } + + visitor.recordLevels.pop(); + + return visitor.record(struct, schema, names, results); + + case UNION: + List types = schema.getTypes(); + List options = Lists.newArrayListWithExpectedSize(types.size()); + for (Schema type : types) { + if (type.getType() == Schema.Type.NULL) { + options.add(visit((Type) null, type, visitor)); + } else { + options.add(visit(iType, type, visitor)); + } + } + return visitor.union(iType, schema, options); + + case ARRAY: + Types.ListType list = iType != null ? iType.asListType() : null; + return visitor.array(list, schema, + visit(list != null ? list.elementType() : null, schema.getElementType(), visitor)); + + case MAP: + Types.MapType map = iType != null ? iType.asMapType() : null; + return visitor.map(map, schema, + visit(map != null ? map.valueType() : null, schema.getValueType(), visitor)); + + default: + return visitor.primitive(iType != null ? iType.asPrimitiveType() : null, schema); + } + } + + private Deque recordLevels = Lists.newLinkedList(); + + public T record(Types.StructType iStruct, Schema record, List names, List fields) { + return null; + } + + public T union(Type iType, Schema union, List options) { + return null; + } + + public T array(Types.ListType iList, Schema array, T element) { + return null; + } + + public T map(Types.MapType iMap, Schema map, T value) { + return null; + } + + public T primitive(Type.PrimitiveType iPrimitive, Schema primitive) { + return null; + } +} From 04e78f2f374d703284639b1c5152a251a2d8019a Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 1 Nov 2019 14:04:26 -0700 Subject: [PATCH 4/8] Slight refactor for complexity. --- .../avro/AvroSchemaWithTypeVisitor.java | 45 ++++++++++--------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java index e28c4eb7d349..b9ee15b770a8 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java @@ -36,27 +36,7 @@ public static T visit(Type iType, Schema schema, AvroSchemaWithTypeVisitor fields = schema.getFields(); - List names = Lists.newArrayListWithExpectedSize(fields.size()); - List results = Lists.newArrayListWithExpectedSize(fields.size()); - for (Schema.Field field : schema.getFields()) { - int fieldId = AvroSchemaUtil.getFieldId(field); - Types.NestedField iField = struct != null ? struct.field(fieldId) : null; - names.add(field.name()); - results.add(visit(iField != null ? iField.type() : null, field.schema(), visitor)); - } - - visitor.recordLevels.pop(); - - return visitor.record(struct, schema, names, results); + return visitRecord(struct, schema, visitor); case UNION: List types = schema.getTypes(); @@ -85,6 +65,29 @@ public static T visit(Type iType, Schema schema, AvroSchemaWithTypeVisitor T visitRecord(Types.StructType struct, Schema record, AvroSchemaWithTypeVisitor visitor) { + // check to make sure this hasn't been visited before + String name = record.getFullName(); + Preconditions.checkState(!visitor.recordLevels.contains(name), + "Cannot process recursive Avro record %s", name); + + visitor.recordLevels.push(name); + + List fields = record.getFields(); + List names = Lists.newArrayListWithExpectedSize(fields.size()); + List results = Lists.newArrayListWithExpectedSize(fields.size()); + for (Schema.Field field : fields) { + int fieldId = AvroSchemaUtil.getFieldId(field); + Types.NestedField iField = struct != null ? struct.field(fieldId) : null; + names.add(field.name()); + results.add(visit(iField != null ? iField.type() : null, field.schema(), visitor)); + } + + visitor.recordLevels.pop(); + + return visitor.record(struct, record, names, results); + } + private Deque recordLevels = Lists.newLinkedList(); public T record(Types.StructType iStruct, Schema record, List names, List fields) { From 1e9230563b157b885e744cd129afe65f5ef72791 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 1 Nov 2019 14:54:19 -0700 Subject: [PATCH 5/8] Fix logical maps. --- .../avro/AvroSchemaWithTypeVisitor.java | 19 ++++++++++++++++--- .../apache/iceberg/data/avro/DataReader.java | 13 +++++-------- .../spark/data/TestParquetAvroReader.java | 3 ++- .../spark/data/TestParquetAvroWriter.java | 3 ++- 4 files changed, 25 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java index b9ee15b770a8..d99693d7667a 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java @@ -51,9 +51,18 @@ public static T visit(Type iType, Schema schema, AvroSchemaWithTypeVisitor keyValueFields = schema.getElementType().getFields(); + return visitor.map(map, schema, + visit(map != null ? map.keyType() : null, keyValueFields.get(0).schema(), visitor), + visit(map != null ? map.valueType() : null, keyValueFields.get(1).schema(), visitor)); + + } else { + Types.ListType list = iType != null ? iType.asListType() : null; + return visitor.array(list, schema, + visit(list != null ? list.elementType() : null, schema.getElementType(), visitor)); + } case MAP: Types.MapType map = iType != null ? iType.asMapType() : null; @@ -102,6 +111,10 @@ public T array(Types.ListType iList, Schema array, T element) { return null; } + public T map(Types.MapType iMap, Schema map, T key, T value) { + return null; + } + public T map(Types.MapType iMap, Schema map, T value) { return null; } diff --git a/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java b/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java index 2964b31f5a7b..0b132531ebcf 100644 --- a/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java +++ b/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java @@ -114,17 +114,14 @@ public ValueReader union(Type ignored, Schema union, List> opt @Override public ValueReader array(Types.ListType ignored, Schema array, ValueReader elementReader) { - if (array.getLogicalType() instanceof LogicalMap) { - ValueReaders.StructReader keyValueReader = (ValueReaders.StructReader) elementReader; - ValueReader keyReader = keyValueReader.reader(0); - ValueReader valueReader = keyValueReader.reader(1); - - return ValueReaders.arrayMap(keyReader, valueReader); - } - return ValueReaders.array(elementReader); } + @Override + public ValueReader map(Types.MapType iMap, Schema map, ValueReader keyReader, ValueReader valueReader) { + return ValueReaders.arrayMap(keyReader, valueReader); + } + @Override public ValueReader map(Types.MapType ignored, Schema map, ValueReader valueReader) { return ValueReaders.map(ValueReaders.strings(), valueReader); diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java index 62ba52e5f2b1..1466deab2af2 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java @@ -66,7 +66,8 @@ public class TestParquetAvroReader { optional(19, "renovate", Types.MapType.ofRequired(20, 21, Types.StringType.get(), Types.StructType.of( optional(22, "jumpy", Types.DoubleType.get()), - required(23, "koala", Types.TimeType.get()) + required(23, "koala", Types.TimeType.get()), + required(24, "couch rope", Types.IntegerType.get()) ))), optional(2, "slide", Types.StringType.get()) ); diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java index 99fe7a952a6a..0e97c37ffe79 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java @@ -66,7 +66,8 @@ public class TestParquetAvroWriter { optional(19, "renovate", Types.MapType.ofRequired(20, 21, Types.StringType.get(), Types.StructType.of( optional(22, "jumpy", Types.DoubleType.get()), - required(23, "koala", Types.TimeType.get()) + required(23, "koala", Types.TimeType.get()), + required(24, "couch rope", Types.IntegerType.get()) ))), optional(2, "slide", Types.StringType.get()) ); From b1282833cc4f7c721fdc9c36d44b7d125ca14356 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 1 Nov 2019 15:04:21 -0700 Subject: [PATCH 6/8] Fix complexity. --- .../avro/AvroSchemaWithTypeVisitor.java | 55 +++++++++++-------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java index d99693d7667a..95c29e7b2554 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java @@ -35,34 +35,13 @@ public static T visit(org.apache.iceberg.Schema iSchema, Schema schema, Avro public static T visit(Type iType, Schema schema, AvroSchemaWithTypeVisitor visitor) { switch (schema.getType()) { case RECORD: - Types.StructType struct = iType != null ? iType.asStructType() : null; - return visitRecord(struct, schema, visitor); + return visitRecord(iType != null ? iType.asStructType() : null, schema, visitor); case UNION: - List types = schema.getTypes(); - List options = Lists.newArrayListWithExpectedSize(types.size()); - for (Schema type : types) { - if (type.getType() == Schema.Type.NULL) { - options.add(visit((Type) null, type, visitor)); - } else { - options.add(visit(iType, type, visitor)); - } - } - return visitor.union(iType, schema, options); + return visitUnion(iType, schema, visitor); case ARRAY: - if (schema.getLogicalType() instanceof LogicalMap) { - Types.MapType map = iType != null ? iType.asMapType() : null; - List keyValueFields = schema.getElementType().getFields(); - return visitor.map(map, schema, - visit(map != null ? map.keyType() : null, keyValueFields.get(0).schema(), visitor), - visit(map != null ? map.valueType() : null, keyValueFields.get(1).schema(), visitor)); - - } else { - Types.ListType list = iType != null ? iType.asListType() : null; - return visitor.array(list, schema, - visit(list != null ? list.elementType() : null, schema.getElementType(), visitor)); - } + return visitArray(iType, schema, visitor); case MAP: Types.MapType map = iType != null ? iType.asMapType() : null; @@ -97,6 +76,34 @@ private static T visitRecord(Types.StructType struct, Schema record, AvroSch return visitor.record(struct, record, names, results); } + private static T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor visitor) { + List types = union.getTypes(); + List options = Lists.newArrayListWithExpectedSize(types.size()); + for (Schema branch : types) { + if (branch.getType() == Schema.Type.NULL) { + options.add(visit((Type) null, branch, visitor)); + } else { + options.add(visit(type, branch, visitor)); + } + } + return visitor.union(type, union, options); + } + + private static T visitArray(Type type, Schema array, AvroSchemaWithTypeVisitor visitor) { + if (array.getLogicalType() instanceof LogicalMap) { + Types.MapType map = type != null ? type.asMapType() : null; + List keyValueFields = array.getElementType().getFields(); + return visitor.map(map, array, + visit(map != null ? map.keyType() : null, keyValueFields.get(0).schema(), visitor), + visit(map != null ? map.valueType() : null, keyValueFields.get(1).schema(), visitor)); + + } else { + Types.ListType list = type != null ? type.asListType() : null; + return visitor.array(list, array, + visit(list != null ? list.elementType() : null, array.getElementType(), visitor)); + } + } + private Deque recordLevels = Lists.newLinkedList(); public T record(Types.StructType iStruct, Schema record, List names, List fields) { From eca274a2f67d38db21c08b0916edf2ef0023d77b Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 1 Nov 2019 15:16:45 -0700 Subject: [PATCH 7/8] Fixup DataReader imports. --- data/src/main/java/org/apache/iceberg/data/avro/DataReader.java | 1 - 1 file changed, 1 deletion(-) diff --git a/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java b/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java index 0b132531ebcf..f975aa8a3f75 100644 --- a/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java +++ b/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java @@ -33,7 +33,6 @@ import org.apache.avro.io.ResolvingDecoder; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor; -import org.apache.iceberg.avro.LogicalMap; import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; import org.apache.iceberg.exceptions.RuntimeIOException; From 7d762042ea7611c16a9601115594ad20b3ee4c6d Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 8 Nov 2019 16:34:47 -0800 Subject: [PATCH 8/8] Update for review comments. --- .../org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java | 5 ++++- .../java/org/apache/iceberg/data/TestReadProjection.java | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java index 95c29e7b2554..d6e078a6838b 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java @@ -90,7 +90,10 @@ private static T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisit } private static T visitArray(Type type, Schema array, AvroSchemaWithTypeVisitor visitor) { - if (array.getLogicalType() instanceof LogicalMap) { + if (array.getLogicalType() instanceof LogicalMap || (type != null && type.isMapType())) { + Preconditions.checkState( + AvroSchemaUtil.isKeyValueSchema(array.getElementType()), + "Cannot visit invalid logical map type: %s", array); Types.MapType map = type != null ? type.asMapType() : null; List keyValueFields = array.getElementType().getFields(); return visitor.map(map, array, diff --git a/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java b/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java index e22b4b7c6089..608e8fa22483 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java +++ b/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java @@ -81,7 +81,7 @@ public void testSpecialCharacterProjection() throws Exception { 0, Comparators.charSequences().compare("test", (CharSequence) full.getField("data%0"))); - Record projected = writeAndRead("full_projection", schema, schema.select("data%0"), record); + Record projected = writeAndRead("special_characters", schema, schema.select("data%0"), record); Assert.assertNull("Should not contain id value", projected.getField("user id")); Assert.assertEquals("Should contain the correct data value",