diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 689a1a8ca126..24716e344c4c 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -235,7 +235,7 @@ public ReadBuilder rename(String fullName, String newName) { return this; } - public ReadBuilder nameMapping(NameMapping newNameMapping) { + public ReadBuilder withNameMapping(NameMapping newNameMapping) { this.nameMapping = newNameMapping; return this; } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java index 02025a74f6da..3a3c28cad84f 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java @@ -315,7 +315,7 @@ private Record writeAndRead(Schema writeSchema, Iterable records = Avro.read(Files.localInput(file)) .project(readSchema) - .nameMapping(nameMapping) + .withNameMapping(nameMapping) .build(); return Iterables.getOnlyElement(records); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java b/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java new file mode 100644 index 000000000000..85d612216723 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.parquet; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.parquet.Preconditions; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +class ApplyNameMapping extends ParquetTypeVisitor { + private final NameMapping nameMapping; + + ApplyNameMapping(NameMapping nameMapping) { + this.nameMapping = nameMapping; + } + + @Override + public Type message(MessageType message, List fields) { + Types.MessageTypeBuilder builder = org.apache.parquet.schema.Types.buildMessage(); + fields.stream().filter(Objects::nonNull).forEach(builder::addField); + + return builder.named(message.getName()); + } + + @Override + public Type struct(GroupType struct, List types) { + MappedField field = nameMapping.find(currentPath()); + List actualTypes = types.stream().filter(Objects::nonNull).collect(Collectors.toList()); + Type structType = struct.withNewFields(actualTypes); + + return field == null ? structType : structType.withId(field.id()); + } + + @Override + public Type list(GroupType list, Type elementType) { + Preconditions.checkArgument(elementType != null, + "List type must have element field"); + + MappedField field = nameMapping.find(currentPath()); + Type listType = org.apache.parquet.schema.Types.list(list.getRepetition()) + .element(elementType) + .named(list.getName()); + + return field == null ? listType : listType.withId(field.id()); + } + + @Override + public Type map(GroupType map, Type keyType, Type valueType) { + Preconditions.checkArgument(keyType != null && valueType != null, + "Map type must have both key field and value field"); + + MappedField field = nameMapping.find(currentPath()); + Type mapType = org.apache.parquet.schema.Types.map(map.getRepetition()) + .key(keyType) + .value(valueType) + .named(map.getName()); + + return field == null ? mapType : mapType.withId(field.id()); + } + + @Override + public Type primitive(PrimitiveType primitive) { + MappedField field = nameMapping.find(currentPath()); + return field == null ? primitive : primitive.withId(field.id()); + } + + @Override + public void beforeRepeatedElement(Type element) { + // do not add the repeated element's name + } + + @Override + public void afterRepeatedElement(Type element) { + // do not remove the repeated element's name + } + + @Override + public void beforeRepeatedKeyValue(Type keyValue) { + // do not add the repeated element's name + } + + @Override + public void afterRepeatedKeyValue(Type keyValue) { + // do not remove the repeated element's name + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index df23785e9bfc..fa6c80da20a3 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -38,6 +38,7 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -312,6 +313,7 @@ public static class ReadBuilder { private boolean callInit = false; private boolean reuseContainers = false; private int maxRecordsPerBatch = 10000; + private NameMapping nameMapping = null; private ReadBuilder(InputFile file) { this.file = file; @@ -393,6 +395,11 @@ public ReadBuilder recordsPerBatch(int numRowsPerBatch) { return this; } + public ReadBuilder withNameMapping(NameMapping newNameMapping) { + this.nameMapping = newNameMapping; + return this; + } + @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"}) public CloseableIterable build() { if (readerFunc != null || batchedReaderFunc != null) { @@ -419,11 +426,11 @@ public CloseableIterable build() { ParquetReadOptions options = optionsBuilder.build(); if (batchedReaderFunc != null) { - return new VectorizedParquetReader(file, schema, options, batchedReaderFunc, filter, reuseContainers, - caseSensitive, maxRecordsPerBatch); + return new VectorizedParquetReader(file, schema, options, batchedReaderFunc, nameMapping, filter, + reuseContainers, caseSensitive, maxRecordsPerBatch); } else { return new org.apache.iceberg.parquet.ParquetReader<>( - file, schema, options, readerFunc, filter, reuseContainers, caseSensitive); + file, schema, options, readerFunc, nameMapping, filter, reuseContainers, caseSensitive); } } @@ -475,6 +482,10 @@ public CloseableIterable build() { builder.withFileRange(start, start + length); } + if (nameMapping != null) { + builder.withNameMapping(nameMapping); + } + return new ParquetIterable<>(builder); } } @@ -483,6 +494,7 @@ private static class ParquetReadBuilder extends ParquetReader.Builder { private Schema schema = null; private ReadSupport readSupport = null; private boolean callInit = false; + private NameMapping nameMapping = null; private ParquetReadBuilder(org.apache.parquet.io.InputFile file) { super(file); @@ -493,6 +505,11 @@ public ParquetReadBuilder project(Schema newSchema) { return this; } + public ParquetReadBuilder withNameMapping(NameMapping newNameMapping) { + this.nameMapping = newNameMapping; + return this; + } + public ParquetReadBuilder readSupport(ReadSupport newReadSupport) { this.readSupport = newReadSupport; return this; @@ -505,7 +522,7 @@ public ParquetReadBuilder callInit() { @Override protected ReadSupport getReadSupport() { - return new ParquetReadSupport<>(schema, readSupport, callInit); + return new ParquetReadSupport<>(schema, readSupport, callInit, nameMapping); } } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java index bedc91a10b58..645b3fa0d1fb 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Schema; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.parquet.avro.AvroReadSupport; @@ -41,11 +42,13 @@ class ParquetReadSupport extends ReadSupport { private final Schema expectedSchema; private final ReadSupport wrapped; private final boolean callInit; + private final NameMapping nameMapping; - ParquetReadSupport(Schema expectedSchema, ReadSupport readSupport, boolean callInit) { + ParquetReadSupport(Schema expectedSchema, ReadSupport readSupport, boolean callInit, NameMapping nameMapping) { this.expectedSchema = expectedSchema; this.wrapped = readSupport; this.callInit = callInit; + this.nameMapping = nameMapping; } @Override @@ -55,9 +58,15 @@ public ReadContext init(Configuration configuration, Map keyValu // matching to the file's columns by full path, so this must select columns by using the path // in the file's schema. - MessageType projection = ParquetSchemaUtil.hasIds(fileSchema) ? - ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) : - ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema); + MessageType projection; + if (ParquetSchemaUtil.hasIds(fileSchema)) { + projection = ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema); + } else if (nameMapping != null) { + MessageType typeWithIds = ParquetSchemaUtil.applyNameMapping(fileSchema, nameMapping); + projection = ParquetSchemaUtil.pruneColumns(typeWithIds, expectedSchema); + } else { + projection = ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema); + } // override some known backward-compatibility options configuration.set("parquet.strict.typing", "false"); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java index 17d96c153db0..d61e4f4075e4 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java @@ -29,6 +29,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMapping; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.hadoop.ParquetFileReader; @@ -42,9 +43,10 @@ public class ParquetReader extends CloseableGroup implements CloseableIterabl private final Expression filter; private final boolean reuseContainers; private final boolean caseSensitive; + private final NameMapping nameMapping; public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions options, - Function> readerFunc, + Function> readerFunc, NameMapping nameMapping, Expression filter, boolean reuseContainers, boolean caseSensitive) { this.input = input; this.expectedSchema = expectedSchema; @@ -54,6 +56,7 @@ public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions this.filter = filter == Expressions.alwaysTrue() ? null : filter; this.reuseContainers = reuseContainers; this.caseSensitive = caseSensitive; + this.nameMapping = nameMapping; } private ReadConf conf = null; @@ -61,7 +64,8 @@ public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions private ReadConf init() { if (conf == null) { ReadConf readConf = new ReadConf<>( - input, options, expectedSchema, filter, readerFunc, null, reuseContainers, caseSensitive, null); + input, options, expectedSchema, filter, readerFunc, null, nameMapping, reuseContainers, + caseSensitive, null); this.conf = readConf.copy(); return readConf; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java index 86dcf6bd95e3..2460096aae95 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java @@ -19,12 +19,16 @@ package org.apache.iceberg.parquet; +import java.util.List; import java.util.Set; import org.apache.iceberg.Schema; +import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types.MessageTypeBuilder; @@ -83,22 +87,7 @@ public static MessageType pruneColumnsFallback(MessageType fileSchema, Schema ex } public static boolean hasIds(MessageType fileSchema) { - try { - // Try to convert the type to Iceberg. If an ID assignment is needed, return false. - ParquetTypeVisitor.visit(fileSchema, new MessageTypeToType(fileSchema) { - @Override - protected int nextId() { - throw new IllegalStateException("Needed to assign ID"); - } - }); - - // no assignment was needed - return true; - - } catch (IllegalStateException e) { - // at least one field was missing an id. - return false; - } + return ParquetTypeVisitor.visit(fileSchema, new HasIds()); } public static MessageType addFallbackIds(MessageType fileSchema) { @@ -112,4 +101,41 @@ public static MessageType addFallbackIds(MessageType fileSchema) { return builder.named(fileSchema.getName()); } + + public static MessageType applyNameMapping(MessageType fileSchema, NameMapping nameMapping) { + return (MessageType) ParquetTypeVisitor.visit(fileSchema, new ApplyNameMapping(nameMapping)); + } + + public static class HasIds extends ParquetTypeVisitor { + @Override + public Boolean message(MessageType message, List fields) { + return struct(message, fields); + } + + @Override + public Boolean struct(GroupType struct, List hasIds) { + for (Boolean hasId : hasIds) { + if (hasId) { + return true; + } + } + return struct.getId() != null; + } + + @Override + public Boolean list(GroupType array, Boolean hasId) { + return hasId || array.getId() != null; + } + + @Override + public Boolean map(GroupType map, Boolean keyHasId, Boolean valueHasId) { + return keyHasId || valueHasId || map.getId() != null; + } + + @Override + public Boolean primitive(PrimitiveType primitive) { + return primitive.getId() != null; + } + } + } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java index bd5db4016e92..d05db4625776 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.Set; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; @@ -45,7 +44,8 @@ public Type message(MessageType message, List fields) { for (int i = 0; i < fields.size(); i += 1) { Type originalField = message.getType(i); Type field = fields.get(i); - if (selectedIds.contains(getId(originalField))) { + Integer fieldId = getId(originalField); + if (fieldId != null && selectedIds.contains(fieldId)) { builder.addField(originalField); fieldCount += 1; } else if (field != null) { @@ -71,7 +71,8 @@ public Type struct(GroupType struct, List fields) { for (int i = 0; i < fields.size(); i += 1) { Type originalField = struct.getType(i); Type field = fields.get(i); - if (selectedIds.contains(getId(originalField))) { + Integer fieldId = getId(originalField); + if (fieldId != null && selectedIds.contains(fieldId)) { filteredFields.add(originalField); } else if (field != null) { filteredFields.add(originalField); @@ -94,17 +95,18 @@ public Type struct(GroupType struct, List fields) { public Type list(GroupType list, Type element) { GroupType repeated = list.getType(0).asGroupType(); Type originalElement = repeated.getType(0); - int elementId = getId(originalElement); + Integer elementId = getId(originalElement); - if (selectedIds.contains(elementId)) { + if (elementId != null && selectedIds.contains(elementId)) { return list; } else if (element != null) { if (element != originalElement) { + Integer listId = getId(list); // the element type was projected - return Types.list(list.getRepetition()) + Type listType = Types.list(list.getRepetition()) .element(element) - .id(getId(list)) .named(list.getName()); + return listId == null ? listType : listType.withId(listId); } return list; } @@ -118,18 +120,20 @@ public Type map(GroupType map, Type key, Type value) { Type originalKey = repeated.getType(0); Type originalValue = repeated.getType(1); - int keyId = getId(originalKey); - int valueId = getId(originalValue); + Integer keyId = getId(originalKey); + Integer valueId = getId(originalValue); - if (selectedIds.contains(keyId) || selectedIds.contains(valueId)) { + if ((keyId != null && selectedIds.contains(keyId)) || (valueId != null && selectedIds.contains(valueId))) { return map; } else if (value != null) { + Integer mapId = getId(map); if (value != originalValue) { - return Types.map(map.getRepetition()) + Type mapType = Types.map(map.getRepetition()) .key(originalKey) .value(value) - .id(getId(map)) .named(map.getName()); + + return mapId == null ? mapType : mapType.withId(mapId); } return map; } @@ -142,8 +146,7 @@ public Type primitive(PrimitiveType primitive) { return null; } - private int getId(Type type) { - Preconditions.checkNotNull(type.getId(), "Missing id for type: %s", type); - return type.getId().intValue(); + private Integer getId(Type type) { + return type.getId() == null ? null : type.getId().intValue(); } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java index e25730ba3871..85f9eb2dae24 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java @@ -25,11 +25,11 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -import javax.annotation.Nullable; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.parquet.ParquetReadOptions; @@ -49,15 +49,12 @@ class ReadConf { private final InputFile file; private final ParquetReadOptions options; private final MessageType projection; - @Nullable private final ParquetValueReader model; - @Nullable private final VectorizedReader vectorizedModel; private final List rowGroups; private final boolean[] shouldSkip; private final long totalValues; private final boolean reuseContainers; - @Nullable private final Integer batchSize; // List of column chunk metadata for each row group @@ -66,19 +63,25 @@ class ReadConf { @SuppressWarnings("unchecked") ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter, Function> readerFunc, Function> batchedReaderFunc, boolean reuseContainers, + VectorizedReader> batchedReaderFunc, NameMapping nameMapping, boolean reuseContainers, boolean caseSensitive, Integer bSize) { this.file = file; this.options = options; this.reader = newReader(file, options); MessageType fileSchema = reader.getFileMetaData().getSchema(); - boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema); - MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema); + MessageType typeWithIds; + if (ParquetSchemaUtil.hasIds(fileSchema)) { + typeWithIds = fileSchema; + this.projection = ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema); + } else if (nameMapping != null) { + typeWithIds = ParquetSchemaUtil.applyNameMapping(fileSchema, nameMapping); + this.projection = ParquetSchemaUtil.pruneColumns(typeWithIds, expectedSchema); + } else { + typeWithIds = ParquetSchemaUtil.addFallbackIds(fileSchema); + this.projection = ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema); + } - this.projection = hasIds ? - ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) : - ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema); this.rowGroups = reader.getRowGroups(); this.shouldSkip = new boolean[rowGroups.size()]; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java index 6cb9da574caa..481012cb8bbe 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java @@ -32,6 +32,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMapping; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.hadoop.ParquetFileReader; @@ -48,11 +49,12 @@ public class VectorizedParquetReader extends CloseableGroup implements Closea private boolean reuseContainers; private final boolean caseSensitive; private final int batchSize; + private final NameMapping nameMapping; public VectorizedParquetReader( InputFile input, Schema expectedSchema, ParquetReadOptions options, - Function> readerFunc, - Expression filter, boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) { + Function> readerFunc, NameMapping nameMapping, Expression filter, + boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) { this.input = input; this.expectedSchema = expectedSchema; this.options = options; @@ -62,6 +64,7 @@ public VectorizedParquetReader( this.reuseContainers = reuseContainers; this.caseSensitive = caseSensitive; this.batchSize = maxRecordsPerBatch; + this.nameMapping = nameMapping; } private ReadConf conf = null; @@ -69,7 +72,8 @@ public VectorizedParquetReader( private ReadConf init() { if (conf == null) { ReadConf readConf = new ReadConf( - input, options, expectedSchema, filter, null, batchReaderFunc, reuseContainers, caseSensitive, batchSize); + input, options, expectedSchema, filter, null, batchReaderFunc, nameMapping, reuseContainers, + caseSensitive, batchSize); this.conf = readConf.copy(); return readConf; } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java new file mode 100644 index 000000000000..f61ca5ebff5c --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.parquet; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.Schema; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.parquet.schema.MessageType; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestParquetSchemaUtil { + private static final Types.StructType SUPPORTED_PRIMITIVES = Types.StructType.of( + required(100, "id", Types.LongType.get()), + optional(101, "data", Types.StringType.get()), + required(102, "b", Types.BooleanType.get()), + optional(103, "i", Types.IntegerType.get()), + required(104, "l", Types.LongType.get()), + optional(105, "f", Types.FloatType.get()), + required(106, "d", Types.DoubleType.get()), + optional(107, "date", Types.DateType.get()), + required(108, "ts", Types.TimestampType.withZone()), + required(110, "s", Types.StringType.get()), + required(112, "fixed", Types.FixedType.ofLength(7)), + optional(113, "bytes", Types.BinaryType.get()), + required(114, "dec_9_0", Types.DecimalType.of(9, 0)), + required(115, "dec_11_2", Types.DecimalType.of(11, 2)), + required(116, "dec_38_10", Types.DecimalType.of(38, 10)) // spark's maximum precision + ); + + @Test + public void testAssignIdsByNameMapping() { + Types.StructType structType = Types.StructType.of( + required(0, "id", Types.LongType.get()), + optional(1, "list_of_maps", + Types.ListType.ofOptional(2, Types.MapType.ofOptional(3, 4, + Types.StringType.get(), + SUPPORTED_PRIMITIVES))), + optional(5, "map_of_lists", + Types.MapType.ofOptional(6, 7, + Types.StringType.get(), + Types.ListType.ofOptional(8, SUPPORTED_PRIMITIVES))), + required(9, "list_of_lists", + Types.ListType.ofOptional(10, Types.ListType.ofOptional(11, SUPPORTED_PRIMITIVES))), + required(12, "map_of_maps", + Types.MapType.ofOptional(13, 14, + Types.StringType.get(), + Types.MapType.ofOptional(15, 16, + Types.StringType.get(), + SUPPORTED_PRIMITIVES))), + required(17, "list_of_struct_of_nested_types", Types.ListType.ofOptional(19, Types.StructType.of( + Types.NestedField.required(20, "m1", Types.MapType.ofOptional(21, 22, + Types.StringType.get(), + SUPPORTED_PRIMITIVES)), + Types.NestedField.optional(23, "l1", Types.ListType.ofRequired(24, SUPPORTED_PRIMITIVES)), + Types.NestedField.required(25, "l2", Types.ListType.ofRequired(26, SUPPORTED_PRIMITIVES)), + Types.NestedField.optional(27, "m2", Types.MapType.ofOptional(28, 29, + Types.StringType.get(), + SUPPORTED_PRIMITIVES)) + ))) + ); + + Schema schema = new Schema(TypeUtil.assignFreshIds(structType, new AtomicInteger(0)::incrementAndGet) + .asStructType().fields()); + NameMapping nameMapping = MappingUtil.create(schema); + MessageType messageType = ParquetSchemaUtil.convert(schema, "complex_schema"); + MessageType typeWithIdsFromNameMapping = ParquetSchemaUtil.applyNameMapping(messageType, nameMapping); + Schema newSchema = ParquetSchemaUtil.convert(typeWithIdsFromNameMapping); + + Assert.assertEquals(schema.asStruct(), newSchema.asStruct()); + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 2baf59ea6f08..51ddc9432bc3 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -142,9 +142,11 @@ public ParquetValueReader struct(Types.StructType expected, GroupType struct, for (int i = 0; i < fields.size(); i += 1) { Type fieldType = fields.get(i); int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1; - int id = fieldType.getId().intValue(); - readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i))); - typesById.put(id, fieldType); + if (fieldType.getId() != null) { + int id = fieldType.getId().intValue(); + readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i))); + typesById.put(id, fieldType); + } } List expectedFields = expected != null ? diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index 01cbe6f286ad..3eb55ebaf22a 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -78,6 +78,7 @@ public VectorizedReader message( List fields = groupType.getFields(); IntStream.range(0, fields.size()) + .filter(pos -> fields.get(pos).getId() != null) .forEach(pos -> readersById.put(fields.get(pos).getId().intValue(), fieldReaders.get(pos))); List icebergFields = expected != null ? @@ -114,6 +115,9 @@ public VectorizedReader primitive( PrimitiveType primitive) { // Create arrow vector for this field + if (primitive.getId() == null) { + return null; + } int parquetFieldId = primitive.getId().intValue(); ColumnDescriptor desc = parquetSchema.getColumnDescription(currentPath()); // Nested types not yet supported for vectorized reads diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index eeb3ad559858..f784b638d376 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -29,6 +29,7 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; @@ -36,14 +37,16 @@ class BatchDataReader extends BaseDataReader { private final Schema expectedSchema; + private final String nameMapping; private final boolean caseSensitive; private final int batchSize; BatchDataReader( - CombinedScanTask task, Schema expectedSchema, FileIO fileIo, + CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO fileIo, EncryptionManager encryptionManager, boolean caseSensitive, int size) { super(task, fileIo, encryptionManager); this.expectedSchema = expectedSchema; + this.nameMapping = nameMapping; this.caseSensitive = caseSensitive; this.batchSize = size; } @@ -54,7 +57,7 @@ CloseableIterator open(FileScanTask task) { InputFile location = getInputFile(task); Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask"); if (task.file().format() == FileFormat.PARQUET) { - iter = Parquet.read(location) + Parquet.ReadBuilder builder = Parquet.read(location) .project(expectedSchema) .split(task.start(), task.length()) .createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(expectedSchema, @@ -65,8 +68,13 @@ CloseableIterator open(FileScanTask task) { // Spark eagerly consumes the batches. So the underlying memory allocated could be reused // without worrying about subsequent reads clobbering over each other. This improves // read performance as every batch read doesn't have to pay the cost of allocating memory. - .reuseContainers() - .build(); + .reuseContainers(); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + iter = builder.build(); } else { throw new UnsupportedOperationException( "Format: " + task.file().format() + " not supported for batched reads"); diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java index d205c22a77f6..a9faf0021fb4 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -44,6 +44,7 @@ import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -70,6 +71,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; + class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics { private static final Logger LOG = LoggerFactory.getLogger(Reader.class); @@ -202,12 +205,13 @@ public List> planBatchInputPartitions() { Preconditions.checkState(batchSize > 0, "Invalid batch size"); String tableSchemaString = SchemaParser.toJson(table.schema()); String expectedSchemaString = SchemaParser.toJson(lazySchema()); + String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING); List> readTasks = Lists.newArrayList(); for (CombinedScanTask task : tasks()) { readTasks.add(new ReadTask<>( - task, tableSchemaString, expectedSchemaString, io, encryptionManager, caseSensitive, localityPreferred, - new BatchReaderFactory(batchSize))); + task, tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager, caseSensitive, + localityPreferred, new BatchReaderFactory(batchSize))); } LOG.info("Batching input partitions with {} tasks.", readTasks.size()); @@ -221,12 +225,13 @@ public List> planBatchInputPartitions() { public List> planInputPartitions() { String tableSchemaString = SchemaParser.toJson(table.schema()); String expectedSchemaString = SchemaParser.toJson(lazySchema()); + String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING); List> readTasks = Lists.newArrayList(); for (CombinedScanTask task : tasks()) { readTasks.add(new ReadTask<>( - task, tableSchemaString, expectedSchemaString, io, encryptionManager, caseSensitive, localityPreferred, - InternalRowReaderFactory.INSTANCE)); + task, tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager, caseSensitive, + localityPreferred, InternalRowReaderFactory.INSTANCE)); } return readTasks; @@ -382,6 +387,7 @@ private static class ReadTask implements Serializable, InputPartition { private final CombinedScanTask task; private final String tableSchemaString; private final String expectedSchemaString; + private final String nameMappingString; private final Broadcast io; private final Broadcast encryptionManager; private final boolean caseSensitive; @@ -390,10 +396,11 @@ private static class ReadTask implements Serializable, InputPartition { private transient Schema tableSchema = null; private transient Schema expectedSchema = null; + private transient NameMapping nameMapping = null; private transient String[] preferredLocations; private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString, - Broadcast io, Broadcast encryptionManager, + String nameMappingString, Broadcast io, Broadcast encryptionManager, boolean caseSensitive, boolean localityPreferred, ReaderFactory readerFactory) { this.task = task; this.tableSchemaString = tableSchemaString; @@ -404,11 +411,12 @@ private ReadTask(CombinedScanTask task, String tableSchemaString, String expecte this.localityPreferred = localityPreferred; this.preferredLocations = getPreferredLocations(); this.readerFactory = readerFactory; + this.nameMappingString = nameMappingString; } @Override public InputPartitionReader createPartitionReader() { - return readerFactory.create(task, lazyTableSchema(), lazyExpectedSchema(), io.value(), + return readerFactory.create(task, lazyTableSchema(), lazyExpectedSchema(), nameMappingString, io.value(), encryptionManager.value(), caseSensitive); } @@ -442,7 +450,8 @@ private String[] getPreferredLocations() { } private interface ReaderFactory extends Serializable { - InputPartitionReader create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO io, + InputPartitionReader create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, + String nameMapping, FileIO io, EncryptionManager encryptionManager, boolean caseSensitive); } @@ -454,9 +463,9 @@ private InternalRowReaderFactory() { @Override public InputPartitionReader create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, - FileIO io, EncryptionManager encryptionManager, - boolean caseSensitive) { - return new RowDataReader(task, tableSchema, expectedSchema, io, encryptionManager, caseSensitive); + String nameMapping, FileIO io, + EncryptionManager encryptionManager, boolean caseSensitive) { + return new RowDataReader(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive); } } @@ -469,9 +478,9 @@ private static class BatchReaderFactory implements ReaderFactory @Override public InputPartitionReader create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, - FileIO io, EncryptionManager encryptionManager, - boolean caseSensitive) { - return new BatchDataReader(task, expectedSchema, io, encryptionManager, caseSensitive, batchSize); + String nameMapping, FileIO io, + EncryptionManager encryptionManager, boolean caseSensitive) { + return new BatchDataReader(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, batchSize); } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index c0e46ebd0bec..fb0b43dac2a8 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -39,6 +39,7 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -71,14 +72,16 @@ class RowDataReader extends BaseDataReader { private final Schema tableSchema; private final Schema expectedSchema; + private final String nameMapping; private final boolean caseSensitive; RowDataReader( - CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo, + CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO fileIo, EncryptionManager encryptionManager, boolean caseSensitive) { super(task, fileIo, encryptionManager); this.tableSchema = tableSchema; this.expectedSchema = expectedSchema; + this.nameMapping = nameMapping; this.caseSensitive = caseSensitive; } @@ -151,13 +154,18 @@ private CloseableIterable newParquetIterable( FileScanTask task, Schema readSchema, Map idToConstant) { - return Parquet.read(location) - .project(readSchema) + Parquet.ReadBuilder builder = Parquet.read(location) .split(task.start(), task.length()) + .project(readSchema) .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant)) .filter(task.residual()) - .caseSensitive(caseSensitive) - .build(); + .caseSensitive(caseSensitive); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + return builder.build(); } private CloseableIterable newOrcIterable( diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java index 0a568740caf5..384a95bf631c 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java @@ -42,6 +42,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; + public class RowDataRewriter implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(RowDataRewriter.class); @@ -49,6 +51,7 @@ public class RowDataRewriter implements Serializable { private final Broadcast fileIO; private final Broadcast encryptionManager; private final String tableSchema; + private final String nameMapping; private final Writer.WriterFactory writerFactory; private final boolean caseSensitive; @@ -60,6 +63,7 @@ public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive, this.caseSensitive = caseSensitive; this.tableSchema = SchemaParser.toJson(table.schema()); + this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING); String formatString = table.properties().getOrDefault( TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); @@ -80,7 +84,8 @@ private Writer.TaskCommit rewriteDataForTask(CombinedScanTask task) throws Excep TaskContext context = TaskContext.get(); RowDataReader dataReader = new RowDataReader(task, SchemaParser.fromJson(tableSchema), - SchemaParser.fromJson(tableSchema), fileIO.value(), encryptionManager.value(), caseSensitive); + SchemaParser.fromJson(tableSchema), nameMapping, fileIO.value(), + encryptionManager.value(), caseSensitive); int partitionId = context.partitionId(); long taskId = context.taskAttemptId(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java index 4f86fa8a2509..37f57d483364 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java @@ -22,17 +22,23 @@ import java.io.File; import java.io.IOException; import java.util.List; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hive.HiveTableBaseTest; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkTableUtil.SparkPartition; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; @@ -48,6 +54,10 @@ import org.junit.rules.TemporaryFolder; import scala.collection.Seq; +import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; +import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED; +import static org.apache.iceberg.types.Types.NestedField.optional; + public class TestSparkTableUtil extends HiveTableBaseTest { private static final Configuration CONF = HiveTableBaseTest.hiveConf; private static final String tableName = "hive_table"; @@ -200,4 +210,92 @@ public void testImportAsHiveTable() throws Exception { long count2 = spark.read().format("iceberg").load(DB_NAME + ".test_partitioned_table").count(); Assert.assertEquals("three values ", 3, count2); } + + @Test + public void testImportWithNameMapping() throws Exception { + spark.table(qualifiedTableName).write().mode("overwrite").format("parquet") + .saveAsTable("original_table"); + + // The field is different so that it will project with name mapping + Schema filteredSchema = new Schema( + optional(1, "data", Types.StringType.get()) + ); + + NameMapping nameMapping = MappingUtil.create(filteredSchema); + + TableIdentifier source = new TableIdentifier("original_table"); + Table table = catalog.createTable( + org.apache.iceberg.catalog.TableIdentifier.of(DB_NAME, "target_table"), + filteredSchema, + SparkSchemaUtil.specForTable(spark, "original_table")); + + table.updateProperties().set(DEFAULT_NAME_MAPPING, NameMappingParser.toJson(nameMapping)).commit(); + + File stagingDir = temp.newFolder("staging-dir"); + SparkTableUtil.importSparkTable(spark, source, table, stagingDir.toString()); + + // The filter invoke the metric/dictionary row group filter in which it project schema + // with name mapping again to match the metric read from footer. + List actual = spark.read().format("iceberg").load(DB_NAME + ".target_table") + .select("data") + .sort("data") + .filter("data<'c'") + .collectAsList() + .stream() + .map(r -> r.getString(0)) + .collect(Collectors.toList()); + + List expected = Lists.newArrayList( + new SimpleRecord(2, "a"), + new SimpleRecord(1, "b") + ); + + Assert.assertEquals(expected.stream().map(SimpleRecord::getData).collect(Collectors.toList()), actual); + } + + @Test + public void testImportWithNameMappingForVectorizedParquetReader() throws Exception { + spark.table(qualifiedTableName).write().mode("overwrite").format("parquet") + .saveAsTable("original_table"); + + // The field is different so that it will project with name mapping + Schema filteredSchema = new Schema( + optional(1, "data", Types.StringType.get()) + ); + + NameMapping nameMapping = MappingUtil.create(filteredSchema); + + TableIdentifier source = new TableIdentifier("original_table"); + Table table = catalog.createTable( + org.apache.iceberg.catalog.TableIdentifier.of(DB_NAME, "target_table_for_vectorization"), + filteredSchema, + SparkSchemaUtil.specForTable(spark, "original_table")); + + table.updateProperties() + .set(DEFAULT_NAME_MAPPING, NameMappingParser.toJson(nameMapping)) + .set(PARQUET_VECTORIZATION_ENABLED, "true") + .commit(); + + File stagingDir = temp.newFolder("staging-dir"); + SparkTableUtil.importSparkTable(spark, source, table, stagingDir.toString()); + + // The filter invoke the metric/dictionary row group filter in which it project schema + // with name mapping again to match the metric read from footer. + List actual = spark.read().format("iceberg") + .load(DB_NAME + ".target_table_for_vectorization") + .select("data") + .sort("data") + .filter("data<'c'") + .collectAsList() + .stream() + .map(r -> r.getString(0)) + .collect(Collectors.toList()); + + List expected = Lists.newArrayList( + new SimpleRecord(2, "a"), + new SimpleRecord(1, "b") + ); + + Assert.assertEquals(expected.stream().map(SimpleRecord::getData).collect(Collectors.toList()), actual); + } }