diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 760b80588cd5..d7341291da33 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.Locale; import java.util.Map; +import java.util.function.BiFunction; import java.util.function.Function; import org.apache.avro.Conversions; import org.apache.avro.LogicalTypes; @@ -174,7 +175,9 @@ public static class ReadBuilder { private NameMapping nameMapping; private boolean reuseContainers = false; private org.apache.iceberg.Schema schema = null; - private Function> createReaderFunc = readSchema -> { + private Function> createReaderFunc = null; + private BiFunction> createReaderBiFunc = null; + private final Function> defaultCreateReaderFunc = readSchema -> { GenericAvroReader reader = new GenericAvroReader<>(readSchema); reader.setClassLoader(defaultLoader); return reader; @@ -188,10 +191,17 @@ private ReadBuilder(InputFile file) { } public ReadBuilder createReaderFunc(Function> readerFunction) { + Preconditions.checkState(createReaderBiFunc == null, "Cannot set multiple createReaderFunc"); this.createReaderFunc = readerFunction; return this; } + public ReadBuilder createReaderFunc(BiFunction> readerFunction) { + Preconditions.checkState(createReaderFunc == null, "Cannot set multiple createReaderFunc"); + this.createReaderBiFunc = readerFunction; + return this; + } + /** * Restricts the read to the given range: [start, end = start + length). * @@ -232,8 +242,17 @@ public ReadBuilder nameMapping(NameMapping newNameMapping) { public AvroIterable build() { Preconditions.checkNotNull(schema, "Schema is required"); + Function> readerFunc; + if (createReaderBiFunc != null) { + readerFunc = avroSchema -> createReaderBiFunc.apply(schema, avroSchema); + } else if (createReaderFunc != null) { + readerFunc = createReaderFunc; + } else { + readerFunc = defaultCreateReaderFunc; + } + return new AvroIterable<>(file, - new ProjectionDatumReader<>(createReaderFunc, schema, renames, nameMapping), + new ProjectionDatumReader<>(readerFunc, schema, renames, nameMapping), start, length, reuseContainers); } } diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index a34dcf553beb..d55ed5a4aa5c 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -347,6 +347,13 @@ static Schema.Field copyField(Schema.Field field, Schema newSchema, String newNa return copy; } + public static String makeCompatibleName(String name) { + if (!validAvroName(name)) { + return sanitize(name); + } + return name; + } + static boolean validAvroName(String name) { int length = name.length(); Preconditions.checkArgument(length > 0, "Empty name"); diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java new file mode 100644 index 000000000000..d6e078a6838b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import java.util.Deque; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +public abstract class AvroSchemaWithTypeVisitor { + public static T visit(org.apache.iceberg.Schema iSchema, Schema schema, AvroSchemaWithTypeVisitor visitor) { + return visit(iSchema.asStruct(), schema, visitor); + } + + public static T visit(Type iType, Schema schema, AvroSchemaWithTypeVisitor visitor) { + switch (schema.getType()) { + case RECORD: + return visitRecord(iType != null ? iType.asStructType() : null, schema, visitor); + + case UNION: + return visitUnion(iType, schema, visitor); + + case ARRAY: + return visitArray(iType, schema, visitor); + + case MAP: + Types.MapType map = iType != null ? iType.asMapType() : null; + return visitor.map(map, schema, + visit(map != null ? map.valueType() : null, schema.getValueType(), visitor)); + + default: + return visitor.primitive(iType != null ? iType.asPrimitiveType() : null, schema); + } + } + + private static T visitRecord(Types.StructType struct, Schema record, AvroSchemaWithTypeVisitor visitor) { + // check to make sure this hasn't been visited before + String name = record.getFullName(); + Preconditions.checkState(!visitor.recordLevels.contains(name), + "Cannot process recursive Avro record %s", name); + + visitor.recordLevels.push(name); + + List fields = record.getFields(); + List names = Lists.newArrayListWithExpectedSize(fields.size()); + List results = Lists.newArrayListWithExpectedSize(fields.size()); + for (Schema.Field field : fields) { + int fieldId = AvroSchemaUtil.getFieldId(field); + Types.NestedField iField = struct != null ? struct.field(fieldId) : null; + names.add(field.name()); + results.add(visit(iField != null ? iField.type() : null, field.schema(), visitor)); + } + + visitor.recordLevels.pop(); + + return visitor.record(struct, record, names, results); + } + + private static T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor visitor) { + List types = union.getTypes(); + List options = Lists.newArrayListWithExpectedSize(types.size()); + for (Schema branch : types) { + if (branch.getType() == Schema.Type.NULL) { + options.add(visit((Type) null, branch, visitor)); + } else { + options.add(visit(type, branch, visitor)); + } + } + return visitor.union(type, union, options); + } + + private static T visitArray(Type type, Schema array, AvroSchemaWithTypeVisitor visitor) { + if (array.getLogicalType() instanceof LogicalMap || (type != null && type.isMapType())) { + Preconditions.checkState( + AvroSchemaUtil.isKeyValueSchema(array.getElementType()), + "Cannot visit invalid logical map type: %s", array); + Types.MapType map = type != null ? type.asMapType() : null; + List keyValueFields = array.getElementType().getFields(); + return visitor.map(map, array, + visit(map != null ? map.keyType() : null, keyValueFields.get(0).schema(), visitor), + visit(map != null ? map.valueType() : null, keyValueFields.get(1).schema(), visitor)); + + } else { + Types.ListType list = type != null ? type.asListType() : null; + return visitor.array(list, array, + visit(list != null ? list.elementType() : null, array.getElementType(), visitor)); + } + } + + private Deque recordLevels = Lists.newLinkedList(); + + public T record(Types.StructType iStruct, Schema record, List names, List fields) { + return null; + } + + public T union(Type iType, Schema union, List options) { + return null; + } + + public T array(Types.ListType iList, Schema array, T element) { + return null; + } + + public T map(Types.MapType iMap, Schema map, T key, T value) { + return null; + } + + public T map(Types.MapType iMap, Schema map, T value) { + return null; + } + + public T primitive(Type.PrimitiveType iPrimitive, Schema primitive) { + return null; + } +} diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index 8d18d96ce20d..35b8981ca0fe 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -87,7 +87,7 @@ public Schema record(Schema record, List names, Iterable s hasChange = true; } - Schema.Field avroField = updateMap.get(field.name()); + Schema.Field avroField = updateMap.get(AvroSchemaUtil.makeCompatibleName(field.name())); if (avroField != null) { updatedFields.add(avroField); @@ -131,7 +131,7 @@ public Schema.Field field(Schema.Field field, Supplier fieldResult) { if (schema != field.schema() || !expectedName.equals(field.name())) { // add an alias for the field - return AvroSchemaUtil.copyField(field, schema, expectedName); + return AvroSchemaUtil.copyField(field, schema, AvroSchemaUtil.makeCompatibleName(expectedName)); } else { // always copy because fields can't be reused return AvroSchemaUtil.copyField(field, field.schema(), field.name()); diff --git a/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java b/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java index 122173d983f4..f975aa8a3f75 100644 --- a/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java +++ b/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java @@ -32,19 +32,20 @@ import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.ResolvingDecoder; import org.apache.iceberg.avro.AvroSchemaUtil; -import org.apache.iceberg.avro.AvroSchemaVisitor; -import org.apache.iceberg.avro.LogicalMap; +import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor; import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; public class DataReader implements DatumReader { private static final ThreadLocal>> DECODER_CACHES = ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap()); - public static DataReader create(Schema readSchema) { - return new DataReader<>(readSchema); + public static DataReader create(org.apache.iceberg.Schema expectedSchema, Schema readSchema) { + return new DataReader<>(expectedSchema, readSchema); } private final Schema readSchema; @@ -52,9 +53,9 @@ public static DataReader create(Schema readSchema) { private Schema fileSchema = null; @SuppressWarnings("unchecked") - private DataReader(Schema readSchema) { + private DataReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema) { this.readSchema = readSchema; - this.reader = (ValueReader) AvroSchemaVisitor.visit(readSchema, new ReadBuilder()); + this.reader = (ValueReader) AvroSchemaWithTypeVisitor.visit(expectedSchema, readSchema, new ReadBuilder()); } @Override @@ -94,40 +95,39 @@ private ResolvingDecoder newResolver() { } } - private static class ReadBuilder extends AvroSchemaVisitor> { + private static class ReadBuilder extends AvroSchemaWithTypeVisitor> { + private ReadBuilder() { } @Override - public ValueReader record(Schema record, List names, List> fields) { - return GenericReaders.struct(AvroSchemaUtil.convert(record).asStructType(), fields); + public ValueReader record(Types.StructType struct, Schema record, + List names, List> fields) { + return GenericReaders.struct(struct, fields); } @Override - public ValueReader union(Schema union, List> options) { + public ValueReader union(Type ignored, Schema union, List> options) { return ValueReaders.union(options); } @Override - public ValueReader array(Schema array, ValueReader elementReader) { - if (array.getLogicalType() instanceof LogicalMap) { - ValueReaders.StructReader keyValueReader = (ValueReaders.StructReader) elementReader; - ValueReader keyReader = keyValueReader.reader(0); - ValueReader valueReader = keyValueReader.reader(1); - - return ValueReaders.arrayMap(keyReader, valueReader); - } - + public ValueReader array(Types.ListType ignored, Schema array, ValueReader elementReader) { return ValueReaders.array(elementReader); } @Override - public ValueReader map(Schema map, ValueReader valueReader) { + public ValueReader map(Types.MapType iMap, Schema map, ValueReader keyReader, ValueReader valueReader) { + return ValueReaders.arrayMap(keyReader, valueReader); + } + + @Override + public ValueReader map(Types.MapType ignored, Schema map, ValueReader valueReader) { return ValueReaders.map(ValueReaders.strings(), valueReader); } @Override - public ValueReader primitive(Schema primitive) { + public ValueReader primitive(Type.PrimitiveType ignored, Schema primitive) { LogicalType logicalType = primitive.getLogicalType(); if (logicalType != null) { switch (logicalType.getName()) { diff --git a/data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java b/data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java index 97a50e9c29e7..42b433f53185 100644 --- a/data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java +++ b/data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java @@ -166,7 +166,9 @@ private static class RawDecoder extends MessageDecoder.BaseDecoder { * @param writeSchema the schema used to decode buffers */ private RawDecoder(org.apache.iceberg.Schema readSchema, org.apache.avro.Schema writeSchema) { - this.reader = new ProjectionDatumReader<>(DataReader::create, readSchema, ImmutableMap.of(), null); + this.reader = new ProjectionDatumReader<>( + avroSchema -> DataReader.create(readSchema, avroSchema), + readSchema, ImmutableMap.of(), null); this.reader.setSchema(writeSchema); } diff --git a/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java b/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java index 54b76d8c8647..608e8fa22483 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java +++ b/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java @@ -63,6 +63,32 @@ public void testFullProjection() throws Exception { Assert.assertTrue("Should contain the correct data value", cmp == 0); } + @Test + public void testSpecialCharacterProjection() throws Exception { + Schema schema = new Schema( + Types.NestedField.required(0, "user id", Types.LongType.get()), + Types.NestedField.optional(1, "data%0", Types.StringType.get()) + ); + + Record record = GenericRecord.create(schema.asStruct()); + record.setField("user id", 34L); + record.setField("data%0", "test"); + + Record full = writeAndRead("special_chars", schema, schema, record); + + Assert.assertEquals("Should contain the correct id value", 34L, (long) full.getField("user id")); + Assert.assertEquals("Should contain the correct data value", + 0, + Comparators.charSequences().compare("test", (CharSequence) full.getField("data%0"))); + + Record projected = writeAndRead("special_characters", schema, schema.select("data%0"), record); + + Assert.assertNull("Should not contain id value", projected.getField("user id")); + Assert.assertEquals("Should contain the correct data value", + 0, + Comparators.charSequences().compare("test", (CharSequence) projected.getField("data%0"))); + } + @Test public void testReorderedFullProjection() throws Exception { Schema schema = new Schema( diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java index 7581520c354e..275710492bc5 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java @@ -20,6 +20,7 @@ package org.apache.iceberg.parquet; import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.types.Type.NestedType; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.TypeUtil; @@ -58,7 +59,7 @@ public MessageType convert(Schema schema, String name) { builder.addField(field(field)); } - return builder.named(name); + return builder.named(AvroSchemaUtil.makeCompatibleName(name)); } public GroupType struct(StructType struct, Type.Repetition repetition, int id, String name) { @@ -68,7 +69,7 @@ public GroupType struct(StructType struct, Type.Repetition repetition, int id, S builder.addField(field(field)); } - return builder.id(id).named(name); + return builder.id(id).named(AvroSchemaUtil.makeCompatibleName(name)); } public Type field(NestedField field) { @@ -98,7 +99,7 @@ public GroupType list(ListType list, Type.Repetition repetition, int id, String return Types.list(repetition) .element(field(elementField)) .id(id) - .named(name); + .named(AvroSchemaUtil.makeCompatibleName(name)); } public GroupType map(MapType map, Type.Repetition repetition, int id, String name) { @@ -108,10 +109,11 @@ public GroupType map(MapType map, Type.Repetition repetition, int id, String nam .key(field(keyField)) .value(field(valueField)) .id(id) - .named(name); + .named(AvroSchemaUtil.makeCompatibleName(name)); } - public Type primitive(PrimitiveType primitive, Type.Repetition repetition, int id, String name) { + public Type primitive(PrimitiveType primitive, Type.Repetition repetition, int id, String originalName) { + String name = AvroSchemaUtil.makeCompatibleName(originalName); switch (primitive.typeId()) { case BOOLEAN: return Types.primitive(BOOLEAN, repetition).id(id).named(name); diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java index 62ba52e5f2b1..1466deab2af2 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java @@ -66,7 +66,8 @@ public class TestParquetAvroReader { optional(19, "renovate", Types.MapType.ofRequired(20, 21, Types.StringType.get(), Types.StructType.of( optional(22, "jumpy", Types.DoubleType.get()), - required(23, "koala", Types.TimeType.get()) + required(23, "koala", Types.TimeType.get()), + required(24, "couch rope", Types.IntegerType.get()) ))), optional(2, "slide", Types.StringType.get()) ); diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java index 99fe7a952a6a..0e97c37ffe79 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java @@ -66,7 +66,8 @@ public class TestParquetAvroWriter { optional(19, "renovate", Types.MapType.ofRequired(20, 21, Types.StringType.get(), Types.StructType.of( optional(22, "jumpy", Types.DoubleType.get()), - required(23, "koala", Types.TimeType.get()) + required(23, "koala", Types.TimeType.get()), + required(24, "couch rope", Types.IntegerType.get()) ))), optional(2, "slide", Types.StringType.get()) ); diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java index 97fa11e80585..00f95f382a15 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java @@ -62,7 +62,8 @@ public class TestSparkParquetWriter { optional(19, "renovate", Types.MapType.ofRequired(20, 21, Types.StringType.get(), Types.StructType.of( optional(22, "jumpy", Types.DoubleType.get()), - required(23, "koala", Types.IntegerType.get()) + required(23, "koala", Types.IntegerType.get()), + required(24, "couch rope", Types.IntegerType.get()) ))), optional(2, "slide", Types.StringType.get()) );